e2e: move funs of framework/statefulset to e2e/apps & e2e/upgrades

Signed-off-by: clarklee92 <clarklee1992@hotmail.com>
This commit is contained in:
clarklee92 2019-12-12 00:31:35 +08:00
parent eef4c00ae9
commit 623c4f9f17
8 changed files with 361 additions and 337 deletions

View File

@ -20,6 +20,7 @@ go_library(
"replica_set.go",
"statefulset.go",
"types.go",
"wait.go",
],
importpath = "k8s.io/kubernetes/test/e2e/apps",
deps = [

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
@ -52,8 +53,27 @@ const (
// Timeout for reads from databases running on stateful pods.
readTimeout = 60 * time.Second
// statefulSetPoll is a poll interval for StatefulSet tests
statefulSetPoll = 10 * time.Second
// statefulSetTimeout is a timeout interval for StatefulSet operations
statefulSetTimeout = 10 * time.Minute
// statefulPodTimeout is a timeout for stateful pods to change state
statefulPodTimeout = 5 * time.Minute
)
var httpProbe = &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80},
},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 1,
}
// GCE Quota requirements: 3 pds, one per stateful pod manifest declared above.
// GCE Api requirements: nodes and master need storage r/w permissions.
var _ = SIGDescribe("StatefulSet", func() {
@ -168,7 +188,7 @@ var _ = SIGDescribe("StatefulSet", func() {
})
ginkgo.By("Checking that the stateful set readopts the pod")
gomega.Expect(e2epod.WaitForPodCondition(c, pod.Namespace, pod.Name, "adopted", e2esset.StatefulSetTimeout,
gomega.Expect(e2epod.WaitForPodCondition(c, pod.Namespace, pod.Name, "adopted", statefulSetTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
@ -188,7 +208,7 @@ var _ = SIGDescribe("StatefulSet", func() {
})
ginkgo.By("Checking that the stateful set releases the pod")
gomega.Expect(e2epod.WaitForPodCondition(c, pod.Namespace, pod.Name, "released", e2esset.StatefulSetTimeout,
gomega.Expect(e2epod.WaitForPodCondition(c, pod.Namespace, pod.Name, "released", statefulSetTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef != nil {
@ -205,7 +225,7 @@ var _ = SIGDescribe("StatefulSet", func() {
})
ginkgo.By("Checking that the stateful set readopts the pod")
gomega.Expect(e2epod.WaitForPodCondition(c, pod.Namespace, pod.Name, "adopted", e2esset.StatefulSetTimeout,
gomega.Expect(e2epod.WaitForPodCondition(c, pod.Namespace, pod.Name, "adopted", statefulSetTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
@ -243,7 +263,7 @@ var _ = SIGDescribe("StatefulSet", func() {
// we set the healthy bit.
ginkgo.By("Deleting healthy stateful pod at index 0.")
e2esset.DeleteStatefulPodAtIndex(c, 0, ss)
deleteStatefulPodAtIndex(c, 0, ss)
ginkgo.By("Confirming stateful pod at index 0 is recreated.")
e2esset.WaitForRunning(c, 2, 1, ss)
@ -283,7 +303,7 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ConformanceIt("should perform canary updates and phased rolling updates of template modifications", func() {
ginkgo.By("Creating a new StatefulSet")
ss := e2esset.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
e2esset.SetHTTPProbe(ss)
setHTTPProbe(ss)
ss.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1.RollingUpdateStatefulSetStrategy {
@ -297,7 +317,7 @@ var _ = SIGDescribe("StatefulSet", func() {
ss, err := c.AppsV1().StatefulSets(ns).Create(ss)
framework.ExpectNoError(err)
e2esset.WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectEqual(currentRevision, updateRevision, fmt.Sprintf("StatefulSet %s/%s created with update revision %s not equal to current revision %s",
ss.Namespace, ss.Name, updateRevision, currentRevision))
@ -314,13 +334,13 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By(fmt.Sprintf("Updating stateful set template: update image from %s to %s", oldImage, newImage))
framework.ExpectNotEqual(oldImage, newImage, "Incorrect test setup: should update to a different image")
ss, err = e2esset.UpdateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
ss, err = updateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
update.Spec.Template.Spec.Containers[0].Image = newImage
})
framework.ExpectNoError(err)
ginkgo.By("Creating a new revision")
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectNotEqual(currentRevision, updateRevision, "Current revision should not equal update revision during rolling update")
@ -349,7 +369,7 @@ var _ = SIGDescribe("StatefulSet", func() {
}()}
}(),
}
ss, err = e2esset.UpdateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
ss, err = updateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
update.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1.RollingUpdateStatefulSetStrategy {
@ -362,7 +382,7 @@ var _ = SIGDescribe("StatefulSet", func() {
}
})
framework.ExpectNoError(err)
ss, pods = e2esset.WaitForPartitionedRollingUpdate(c, ss)
ss, pods = waitForPartitionedRollingUpdate(c, ss)
for i := range pods.Items {
if i < int(*ss.Spec.UpdateStrategy.RollingUpdate.Partition) {
framework.ExpectEqual(pods.Items[i].Spec.Containers[0].Image, oldImage, fmt.Sprintf("Pod %s/%s has image %s not equal to current image %s",
@ -390,10 +410,10 @@ var _ = SIGDescribe("StatefulSet", func() {
}
ginkgo.By("Restoring Pods to the correct revision when they are deleted")
e2esset.DeleteStatefulPodAtIndex(c, 0, ss)
e2esset.DeleteStatefulPodAtIndex(c, 2, ss)
deleteStatefulPodAtIndex(c, 0, ss)
deleteStatefulPodAtIndex(c, 2, ss)
e2esset.WaitForRunningAndReady(c, 3, ss)
ss = e2esset.GetStatefulSet(c, ss.Namespace, ss.Name)
ss = getStatefulSet(c, ss.Namespace, ss.Name)
pods = e2esset.GetPodList(c, ss)
for i := range pods.Items {
if i < int(*ss.Spec.UpdateStrategy.RollingUpdate.Partition) {
@ -423,7 +443,7 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By("Performing a phased rolling update")
for i := int(*ss.Spec.UpdateStrategy.RollingUpdate.Partition) - 1; i >= 0; i-- {
ss, err = e2esset.UpdateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
ss, err = updateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
update.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1.RollingUpdateStatefulSetStrategy {
@ -435,7 +455,7 @@ var _ = SIGDescribe("StatefulSet", func() {
}
})
framework.ExpectNoError(err)
ss, pods = e2esset.WaitForPartitionedRollingUpdate(c, ss)
ss, pods = waitForPartitionedRollingUpdate(c, ss)
for i := range pods.Items {
if i < int(*ss.Spec.UpdateStrategy.RollingUpdate.Partition) {
framework.ExpectEqual(pods.Items[i].Spec.Containers[0].Image, oldImage, fmt.Sprintf("Pod %s/%s has image %s not equal to current image %s",
@ -475,14 +495,14 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.It("should implement legacy replacement when the update strategy is OnDelete", func() {
ginkgo.By("Creating a new StatefulSet")
ss := e2esset.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
e2esset.SetHTTPProbe(ss)
setHTTPProbe(ss)
ss.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
}
ss, err := c.AppsV1().StatefulSets(ns).Create(ss)
framework.ExpectNoError(err)
e2esset.WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectEqual(currentRevision, updateRevision, fmt.Sprintf("StatefulSet %s/%s created with update revision %s not equal to current revision %s",
ss.Namespace, ss.Name, updateRevision, currentRevision))
@ -496,11 +516,11 @@ var _ = SIGDescribe("StatefulSet", func() {
}
ginkgo.By("Restoring Pods to the current revision")
e2esset.DeleteStatefulPodAtIndex(c, 0, ss)
e2esset.DeleteStatefulPodAtIndex(c, 1, ss)
e2esset.DeleteStatefulPodAtIndex(c, 2, ss)
deleteStatefulPodAtIndex(c, 0, ss)
deleteStatefulPodAtIndex(c, 1, ss)
deleteStatefulPodAtIndex(c, 2, ss)
e2esset.WaitForRunningAndReady(c, 3, ss)
ss = e2esset.GetStatefulSet(c, ss.Namespace, ss.Name)
ss = getStatefulSet(c, ss.Namespace, ss.Name)
pods = e2esset.GetPodList(c, ss)
for i := range pods.Items {
framework.ExpectEqual(pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel], currentRevision, fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s",
@ -514,22 +534,22 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By(fmt.Sprintf("Updating stateful set template: update image from %s to %s", oldImage, newImage))
framework.ExpectNotEqual(oldImage, newImage, "Incorrect test setup: should update to a different image")
ss, err = e2esset.UpdateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
ss, err = updateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
update.Spec.Template.Spec.Containers[0].Image = newImage
})
framework.ExpectNoError(err)
ginkgo.By("Creating a new revision")
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectNotEqual(currentRevision, updateRevision, "Current revision should not equal update revision during rolling update")
ginkgo.By("Recreating Pods at the new revision")
e2esset.DeleteStatefulPodAtIndex(c, 0, ss)
e2esset.DeleteStatefulPodAtIndex(c, 1, ss)
e2esset.DeleteStatefulPodAtIndex(c, 2, ss)
deleteStatefulPodAtIndex(c, 0, ss)
deleteStatefulPodAtIndex(c, 1, ss)
deleteStatefulPodAtIndex(c, 2, ss)
e2esset.WaitForRunningAndReady(c, 3, ss)
ss = e2esset.GetStatefulSet(c, ss.Namespace, ss.Name)
ss = getStatefulSet(c, ss.Namespace, ss.Name)
pods = e2esset.GetPodList(c, ss)
for i := range pods.Items {
framework.ExpectEqual(pods.Items[i].Spec.Containers[0].Image, newImage, fmt.Sprintf("Pod %s/%s has image %s not equal to new image %s",
@ -560,7 +580,7 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By("Creating stateful set " + ssName + " in namespace " + ns)
ss := e2esset.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
e2esset.SetHTTPProbe(ss)
setHTTPProbe(ss)
ss, err = c.AppsV1().StatefulSets(ns).Create(ss)
framework.ExpectNoError(err)
@ -568,19 +588,19 @@ var _ = SIGDescribe("StatefulSet", func() {
e2esset.WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
ginkgo.By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
e2esset.BreakHTTPProbe(c, ss)
e2esset.WaitForRunningAndNotReady(c, *ss.Spec.Replicas, ss)
breakHTTPProbe(c, ss)
waitForRunningAndNotReady(c, *ss.Spec.Replicas, ss)
e2esset.WaitForStatusReadyReplicas(c, ss, 0)
e2esset.UpdateReplicas(c, ss, 3)
confirmStatefulPodCount(c, 1, ss, 10*time.Second, true)
ginkgo.By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
e2esset.RestoreHTTPProbe(c, ss)
restoreHTTPProbe(c, ss)
e2esset.WaitForRunningAndReady(c, 3, ss)
ginkgo.By("Verifying that stateful set " + ssName + " was scaled up in order")
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), e2esset.StatefulSetTimeout)
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
@ -601,19 +621,19 @@ var _ = SIGDescribe("StatefulSet", func() {
})
framework.ExpectNoError(err)
e2esset.BreakHTTPProbe(c, ss)
breakHTTPProbe(c, ss)
e2esset.WaitForStatusReadyReplicas(c, ss, 0)
e2esset.WaitForRunningAndNotReady(c, 3, ss)
waitForRunningAndNotReady(c, 3, ss)
e2esset.UpdateReplicas(c, ss, 0)
confirmStatefulPodCount(c, 3, ss, 10*time.Second, true)
ginkgo.By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
e2esset.RestoreHTTPProbe(c, ss)
restoreHTTPProbe(c, ss)
e2esset.Scale(c, ss, 0)
ginkgo.By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), e2esset.StatefulSetTimeout)
ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Deleted {
@ -640,7 +660,7 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By("Creating stateful set " + ssName + " in namespace " + ns)
ss := e2esset.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
ss.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
e2esset.SetHTTPProbe(ss)
setHTTPProbe(ss)
ss, err := c.AppsV1().StatefulSets(ns).Create(ss)
framework.ExpectNoError(err)
@ -648,25 +668,25 @@ var _ = SIGDescribe("StatefulSet", func() {
e2esset.WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
ginkgo.By("Confirming that stateful set scale up will not halt with unhealthy stateful pod")
e2esset.BreakHTTPProbe(c, ss)
e2esset.WaitForRunningAndNotReady(c, *ss.Spec.Replicas, ss)
breakHTTPProbe(c, ss)
waitForRunningAndNotReady(c, *ss.Spec.Replicas, ss)
e2esset.WaitForStatusReadyReplicas(c, ss, 0)
e2esset.UpdateReplicas(c, ss, 3)
confirmStatefulPodCount(c, 3, ss, 10*time.Second, false)
ginkgo.By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
e2esset.RestoreHTTPProbe(c, ss)
restoreHTTPProbe(c, ss)
e2esset.WaitForRunningAndReady(c, 3, ss)
ginkgo.By("Scale down will not halt with unhealthy stateful pod")
e2esset.BreakHTTPProbe(c, ss)
breakHTTPProbe(c, ss)
e2esset.WaitForStatusReadyReplicas(c, ss, 0)
e2esset.WaitForRunningAndNotReady(c, 3, ss)
waitForRunningAndNotReady(c, 3, ss)
e2esset.UpdateReplicas(c, ss, 0)
confirmStatefulPodCount(c, 0, ss, 10*time.Second, false)
ginkgo.By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
e2esset.RestoreHTTPProbe(c, ss)
restoreHTTPProbe(c, ss)
e2esset.Scale(c, ss, 0)
e2esset.WaitForStatusReplicas(c, ss, 0)
})
@ -720,7 +740,7 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name)
w, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
framework.ExpectNoError(err)
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), e2esset.StatefulPodTimeout)
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulPodTimeout)
defer cancel()
// we need to get UID from pod in any state and wait until stateful set controller will remove pod at least once
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
@ -759,7 +779,7 @@ var _ = SIGDescribe("StatefulSet", func() {
return fmt.Errorf("pod %v wasn't recreated: %v == %v", statefulPod.Name, statefulPod.UID, initialStatefulPodUID)
}
return nil
}, e2esset.StatefulPodTimeout, 2*time.Second).Should(gomega.BeNil())
}, statefulPodTimeout, 2*time.Second).Should(gomega.BeNil())
})
/*
@ -772,11 +792,11 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ConformanceIt("should have a working scale subresource", func() {
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
ss := e2esset.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels)
e2esset.SetHTTPProbe(ss)
setHTTPProbe(ss)
ss, err := c.AppsV1().StatefulSets(ns).Create(ss)
framework.ExpectNoError(err)
e2esset.WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
ginkgo.By("getting scale subresource")
scale, err := c.AppsV1().StatefulSets(ns).GetScale(ssName, metav1.GetOptions{})
@ -1064,11 +1084,11 @@ func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, k
// This function is used by two tests to test StatefulSet rollbacks: one using
// PVCs and one using no storage.
func rollbackTest(c clientset.Interface, ns string, ss *appsv1.StatefulSet) {
e2esset.SetHTTPProbe(ss)
setHTTPProbe(ss)
ss, err := c.AppsV1().StatefulSets(ns).Create(ss)
framework.ExpectNoError(err)
e2esset.WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectEqual(currentRevision, updateRevision, fmt.Sprintf("StatefulSet %s/%s created with update revision %s not equal to current revision %s",
ss.Namespace, ss.Name, updateRevision, currentRevision))
@ -1081,31 +1101,31 @@ func rollbackTest(c clientset.Interface, ns string, ss *appsv1.StatefulSet) {
currentRevision))
}
e2esset.SortStatefulPods(pods)
err = e2esset.BreakPodHTTPProbe(ss, &pods.Items[1])
err = breakPodHTTPProbe(ss, &pods.Items[1])
framework.ExpectNoError(err)
ss, pods = e2esset.WaitForPodNotReady(c, ss, pods.Items[1].Name)
ss, pods = waitForPodNotReady(c, ss, pods.Items[1].Name)
newImage := NewWebserverImage
oldImage := ss.Spec.Template.Spec.Containers[0].Image
ginkgo.By(fmt.Sprintf("Updating StatefulSet template: update image from %s to %s", oldImage, newImage))
framework.ExpectNotEqual(oldImage, newImage, "Incorrect test setup: should update to a different image")
ss, err = e2esset.UpdateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
ss, err = updateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
update.Spec.Template.Spec.Containers[0].Image = newImage
})
framework.ExpectNoError(err)
ginkgo.By("Creating a new revision")
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectNotEqual(currentRevision, updateRevision, "Current revision should not equal update revision during rolling update")
ginkgo.By("Updating Pods in reverse ordinal order")
pods = e2esset.GetPodList(c, ss)
e2esset.SortStatefulPods(pods)
err = e2esset.RestorePodHTTPProbe(ss, &pods.Items[1])
err = restorePodHTTPProbe(ss, &pods.Items[1])
framework.ExpectNoError(err)
ss, pods = e2esset.WaitForPodReady(c, ss, pods.Items[1].Name)
ss, pods = e2esset.WaitForRollingUpdate(c, ss)
ss, pods = waitForRollingUpdate(c, ss)
framework.ExpectEqual(ss.Status.CurrentRevision, updateRevision, fmt.Sprintf("StatefulSet %s/%s current revision %s does not equal update revision %s on update completion",
ss.Namespace,
ss.Name,
@ -1125,16 +1145,16 @@ func rollbackTest(c clientset.Interface, ns string, ss *appsv1.StatefulSet) {
}
ginkgo.By("Rolling back to a previous revision")
err = e2esset.BreakPodHTTPProbe(ss, &pods.Items[1])
err = breakPodHTTPProbe(ss, &pods.Items[1])
framework.ExpectNoError(err)
ss, pods = e2esset.WaitForPodNotReady(c, ss, pods.Items[1].Name)
ss, pods = waitForPodNotReady(c, ss, pods.Items[1].Name)
priorRevision := currentRevision
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
ss, err = e2esset.UpdateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
ss, err = updateStatefulSetWithRetries(c, ns, ss.Name, func(update *appsv1.StatefulSet) {
update.Spec.Template.Spec.Containers[0].Image = oldImage
})
framework.ExpectNoError(err)
ss = e2esset.WaitForStatus(c, ss)
ss = waitForStatus(c, ss)
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
framework.ExpectEqual(priorRevision, updateRevision, "Prior revision should equal update revision during roll back")
framework.ExpectNotEqual(currentRevision, updateRevision, "Current revision should not equal update revision during roll back")
@ -1142,9 +1162,9 @@ func rollbackTest(c clientset.Interface, ns string, ss *appsv1.StatefulSet) {
ginkgo.By("Rolling back update in reverse ordinal order")
pods = e2esset.GetPodList(c, ss)
e2esset.SortStatefulPods(pods)
e2esset.RestorePodHTTPProbe(ss, &pods.Items[1])
restorePodHTTPProbe(ss, &pods.Items[1])
ss, pods = e2esset.WaitForPodReady(c, ss, pods.Items[1].Name)
ss, pods = e2esset.WaitForRollingUpdate(c, ss)
ss, pods = waitForRollingUpdate(c, ss)
framework.ExpectEqual(ss.Status.CurrentRevision, priorRevision, fmt.Sprintf("StatefulSet %s/%s current revision %s does not equal prior revision %s on rollback completion",
ss.Namespace,
ss.Name,
@ -1187,3 +1207,108 @@ func confirmStatefulPodCount(c clientset.Interface, count int, ss *appsv1.Statef
time.Sleep(1 * time.Second)
}
}
// setHTTPProbe sets the pod template's ReadinessProbe for Webserver StatefulSet containers.
// This probe can then be controlled with BreakHTTPProbe() and RestoreHTTPProbe().
// Note that this cannot be used together with PauseNewPods().
func setHTTPProbe(ss *appsv1.StatefulSet) {
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = httpProbe
}
// breakHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in ss.
func breakHTTPProbe(c clientset.Interface, ss *appsv1.StatefulSet) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /usr/local/apache2/htdocs%v /tmp/ || true", path)
return e2esset.ExecInStatefulPods(c, ss, cmd)
}
// breakPodHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in one pod.
func breakPodHTTPProbe(ss *appsv1.StatefulSet, pod *v1.Pod) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /usr/local/apache2/htdocs%v /tmp/ || true", path)
stdout, err := framework.RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, statefulSetPoll, statefulPodTimeout)
framework.Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
return err
}
// restoreHTTPProbe restores the readiness probe for Nginx StatefulSet containers in ss.
func restoreHTTPProbe(c clientset.Interface, ss *appsv1.StatefulSet) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /tmp%v /usr/local/apache2/htdocs/ || true", path)
return e2esset.ExecInStatefulPods(c, ss, cmd)
}
// restorePodHTTPProbe restores the readiness probe for Nginx StatefulSet containers in pod.
func restorePodHTTPProbe(ss *appsv1.StatefulSet, pod *v1.Pod) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /tmp%v /usr/local/apache2/htdocs/ || true", path)
stdout, err := framework.RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, statefulSetPoll, statefulPodTimeout)
framework.Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
return err
}
// deleteStatefulPodAtIndex deletes the Pod with ordinal index in ss.
func deleteStatefulPodAtIndex(c clientset.Interface, index int, ss *appsv1.StatefulSet) {
name := getStatefulSetPodNameAtIndex(index, ss)
noGrace := int64(0)
if err := c.CoreV1().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
framework.Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
}
}
// getStatefulSetPodNameAtIndex gets formated pod name given index.
func getStatefulSetPodNameAtIndex(index int, ss *appsv1.StatefulSet) string {
// TODO: we won't use "-index" as the name strategy forever,
// pull the name out from an identity mapper.
return fmt.Sprintf("%v-%v", ss.Name, index)
}
type updateStatefulSetFunc func(*appsv1.StatefulSet)
// updateStatefulSetWithRetries updates statfulset template with retries.
func updateStatefulSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateStatefulSetFunc) (statefulSet *appsv1.StatefulSet, err error) {
statefulSets := c.AppsV1().StatefulSets(namespace)
var updateErr error
pollErr := wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
if statefulSet, err = statefulSets.Get(name, metav1.GetOptions{}); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(statefulSet)
if statefulSet, err = statefulSets.Update(statefulSet); err == nil {
framework.Logf("Updating stateful set %s", name)
return true, nil
}
updateErr = err
return false, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to stateful set %q: %v", name, updateErr)
}
return statefulSet, pollErr
}
// getStatefulSet gets the StatefulSet named name in namespace.
func getStatefulSet(c clientset.Interface, namespace, name string) *appsv1.StatefulSet {
ss, err := c.AppsV1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
framework.Failf("Failed to get StatefulSet %s/%s: %v", namespace, name, err)
}
return ss
}

155
test/e2e/apps/wait.go Normal file
View File

@ -0,0 +1,155 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apps
import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/test/e2e/framework"
e2esset "k8s.io/kubernetes/test/e2e/framework/statefulset"
)
// waitForPartitionedRollingUpdate waits for all Pods in set to exist and have the correct revision. set must have
// a RollingUpdateStatefulSetStrategyType with a non-nil RollingUpdate and Partition. All Pods with ordinals less
// than or equal to the Partition are expected to be at set's current revision. All other Pods are expected to be
// at its update revision.
func waitForPartitionedRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
framework.Failf("StatefulSet %s/%s attempt to wait for partitioned update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
if set.Spec.UpdateStrategy.RollingUpdate == nil || set.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
framework.Failf("StatefulSet %s/%s attempt to wait for partitioned update with nil RollingUpdate or nil Partition",
set.Namespace,
set.Name)
}
e2esset.WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
if len(pods.Items) < int(*set.Spec.Replicas) {
return false, nil
}
if partition <= 0 && set.Status.UpdateRevision != set.Status.CurrentRevision {
framework.Logf("Waiting for StatefulSet %s/%s to complete update",
set.Namespace,
set.Name,
)
e2esset.SortStatefulPods(pods)
for i := range pods.Items {
if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
}
}
return false, nil
}
for i := int(*set.Spec.Replicas) - 1; i >= partition; i-- {
if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
return false, nil
}
}
return true, nil
})
return set, pods
}
// waitForStatus waits for the StatefulSetStatus's ObservedGeneration to be greater than or equal to set's Generation.
// The returned StatefulSet contains such a StatefulSetStatus
func waitForStatus(c clientset.Interface, set *appsv1.StatefulSet) *appsv1.StatefulSet {
e2esset.WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods *v1.PodList) (bool, error) {
if set2.Status.ObservedGeneration >= set.Generation {
set = set2
return true, nil
}
return false, nil
})
return set
}
// waitForPodNotReady waits for the Pod named podName in set to exist and to not have a Ready condition.
func waitForPodNotReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
e2esset.WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
for i := range pods.Items {
if pods.Items[i].Name == podName {
return !podutil.IsPodReady(&pods.Items[i]), nil
}
}
return false, nil
})
return set, pods
}
// waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
// complete. set must have a RollingUpdateStatefulSetStrategyType.
func waitForRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
framework.Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
e2esset.WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
if len(pods.Items) < int(*set.Spec.Replicas) {
return false, nil
}
if set.Status.UpdateRevision != set.Status.CurrentRevision {
framework.Logf("Waiting for StatefulSet %s/%s to complete update",
set.Namespace,
set.Name,
)
e2esset.SortStatefulPods(pods)
for i := range pods.Items {
if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
}
}
return false, nil
}
return true, nil
})
return set, pods
}
// waitForRunningAndNotReady waits for numStatefulPods in ss to be Running and not Ready.
func waitForRunningAndNotReady(c clientset.Interface, numStatefulPods int32, ss *appsv1.StatefulSet) {
e2esset.WaitForRunning(c, numStatefulPods, 0, ss)
}

View File

@ -18,7 +18,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
e2efwk "k8s.io/kubernetes/test/e2e/framework"
@ -112,95 +111,11 @@ func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
}
}
// CreateStatefulSetService creates a Headless Service with Name name and Selector set to match labels.
func CreateStatefulSetService(name string, labels map[string]string) *v1.Service {
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.ServiceSpec{
Selector: labels,
},
}
headlessService.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
}
headlessService.Spec.ClusterIP = "None"
return headlessService
}
// SetHTTPProbe sets the pod template's ReadinessProbe for Webserver StatefulSet containers.
// This probe can then be controlled with BreakHTTPProbe() and RestoreHTTPProbe().
// Note that this cannot be used together with PauseNewPods().
func SetHTTPProbe(ss *appsv1.StatefulSet) {
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = httpProbe
}
// BreakHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in ss.
func BreakHTTPProbe(c clientset.Interface, ss *appsv1.StatefulSet) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /usr/local/apache2/htdocs%v /tmp/ || true", path)
return ExecInStatefulPods(c, ss, cmd)
}
// BreakPodHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in one pod.
func BreakPodHTTPProbe(ss *appsv1.StatefulSet, pod *v1.Pod) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /usr/local/apache2/htdocs%v /tmp/ || true", path)
stdout, err := e2efwk.RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
e2efwk.Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
return err
}
// RestoreHTTPProbe restores the readiness probe for Nginx StatefulSet containers in ss.
func RestoreHTTPProbe(c clientset.Interface, ss *appsv1.StatefulSet) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /tmp%v /usr/local/apache2/htdocs/ || true", path)
return ExecInStatefulPods(c, ss, cmd)
}
// RestorePodHTTPProbe restores the readiness probe for Nginx StatefulSet containers in pod.
func RestorePodHTTPProbe(ss *appsv1.StatefulSet, pod *v1.Pod) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
}
// Ignore 'mv' errors to make this idempotent.
cmd := fmt.Sprintf("mv -v /tmp%v /usr/local/apache2/htdocs/ || true", path)
stdout, err := e2efwk.RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
e2efwk.Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
return err
}
func hasPauseProbe(pod *v1.Pod) bool {
probe := pod.Spec.Containers[0].ReadinessProbe
return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
}
var httpProbe = &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80},
},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 1,
}
var pauseProbe = &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},

View File

@ -68,15 +68,6 @@ func GetPodList(c clientset.Interface, ss *appsv1.StatefulSet) *v1.PodList {
return podList
}
// DeleteStatefulPodAtIndex deletes the Pod with ordinal index in ss.
func DeleteStatefulPodAtIndex(c clientset.Interface, index int, ss *appsv1.StatefulSet) {
name := getStatefulSetPodNameAtIndex(index, ss)
noGrace := int64(0)
if err := c.CoreV1().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
e2efwk.Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
}
}
// DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns.
func DeleteAllStatefulSets(c clientset.Interface, ns string) {
ssList, err := c.AppsV1().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
@ -149,29 +140,6 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) {
}
}
// UpdateStatefulSetWithRetries updates statfulset template with retries.
func UpdateStatefulSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateStatefulSetFunc) (statefulSet *appsv1.StatefulSet, err error) {
statefulSets := c.AppsV1().StatefulSets(namespace)
var updateErr error
pollErr := wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
if statefulSet, err = statefulSets.Get(name, metav1.GetOptions{}); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(statefulSet)
if statefulSet, err = statefulSets.Update(statefulSet); err == nil {
e2efwk.Logf("Updating stateful set %s", name)
return true, nil
}
updateErr = err
return false, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to stateful set %q: %v", name, updateErr)
}
return statefulSet, pollErr
}
// Scale scales ss to count replicas.
func Scale(c clientset.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.StatefulSet, error) {
name := ss.Name
@ -218,15 +186,6 @@ func Restart(c clientset.Interface, ss *appsv1.StatefulSet) {
update(c, ss.Namespace, ss.Name, func(ss *appsv1.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
}
// GetStatefulSet gets the StatefulSet named name in namespace.
func GetStatefulSet(c clientset.Interface, namespace, name string) *appsv1.StatefulSet {
ss, err := c.AppsV1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
e2efwk.Failf("Failed to get StatefulSet %s/%s: %v", namespace, name, err)
}
return ss
}
// CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed.
func CheckHostname(c clientset.Interface, ss *appsv1.StatefulSet) error {
cmd := "printf $(hostname)"
@ -285,19 +244,6 @@ func ExecInStatefulPods(c clientset.Interface, ss *appsv1.StatefulSet, cmd strin
return nil
}
type updateStatefulSetFunc func(*appsv1.StatefulSet)
// VerifyStatefulPodFunc is a func that examines a StatefulSetPod.
type VerifyStatefulPodFunc func(*v1.Pod)
// VerifyPodAtIndex applies a visitor pattern to the Pod at index in ss. verify is applied to the Pod to "visit" it.
func VerifyPodAtIndex(c clientset.Interface, index int, ss *appsv1.StatefulSet, verify VerifyStatefulPodFunc) {
name := getStatefulSetPodNameAtIndex(index, ss)
pod, err := c.CoreV1().Pods(ss.Namespace).Get(name, metav1.GetOptions{})
e2efwk.ExpectNoError(err, fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name))
verify(pod)
}
// udpate updates a statefulset, and it is only used within rest.go
func update(c clientset.Interface, ns, name string, update func(ss *appsv1.StatefulSet)) *appsv1.StatefulSet {
for i := 0; i < 3; i++ {
@ -317,10 +263,3 @@ func update(c clientset.Interface, ns, name string, update func(ss *appsv1.State
e2efwk.Failf("too many retries draining statefulset %q", name)
return nil
}
// getStatefulSetPodNameAtIndex gets formated pod name given index.
func getStatefulSetPodNameAtIndex(index int, ss *appsv1.StatefulSet) string {
// TODO: we won't use "-index" as the name strategy forever,
// pull the name out from an identity mapper.
return fmt.Sprintf("%v-%v", ss.Name, index)
}

View File

@ -28,62 +28,6 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
)
// WaitForPartitionedRollingUpdate waits for all Pods in set to exist and have the correct revision. set must have
// a RollingUpdateStatefulSetStrategyType with a non-nil RollingUpdate and Partition. All Pods with ordinals less
// than or equal to the Partition are expected to be at set's current revision. All other Pods are expected to be
// at its update revision.
func WaitForPartitionedRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
framework.Failf("StatefulSet %s/%s attempt to wait for partitioned update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
if set.Spec.UpdateStrategy.RollingUpdate == nil || set.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
framework.Failf("StatefulSet %s/%s attempt to wait for partitioned update with nil RollingUpdate or nil Partition",
set.Namespace,
set.Name)
}
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
if len(pods.Items) < int(*set.Spec.Replicas) {
return false, nil
}
if partition <= 0 && set.Status.UpdateRevision != set.Status.CurrentRevision {
framework.Logf("Waiting for StatefulSet %s/%s to complete update",
set.Namespace,
set.Name,
)
SortStatefulPods(pods)
for i := range pods.Items {
if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
}
}
return false, nil
}
for i := int(*set.Spec.Replicas) - 1; i >= partition; i-- {
if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
return false, nil
}
}
return true, nil
})
return set, pods
}
// WaitForRunning waits for numPodsRunning in ss to be Running and for the first
// numPodsReady ordinals to be Ready.
func WaitForRunning(c clientset.Interface, numPodsRunning, numPodsReady int32, ss *appsv1.StatefulSet) {
@ -130,19 +74,6 @@ func WaitForState(c clientset.Interface, ss *appsv1.StatefulSet, until func(*app
}
}
// WaitForStatus waits for the StatefulSetStatus's ObservedGeneration to be greater than or equal to set's Generation.
// The returned StatefulSet contains such a StatefulSetStatus
func WaitForStatus(c clientset.Interface, set *appsv1.StatefulSet) *appsv1.StatefulSet {
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods *v1.PodList) (bool, error) {
if set2.Status.ObservedGeneration >= set.Generation {
set = set2
return true, nil
}
return false, nil
})
return set
}
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
func WaitForRunningAndReady(c clientset.Interface, numStatefulPods int32, ss *appsv1.StatefulSet) {
WaitForRunning(c, numStatefulPods, numStatefulPods, ss)
@ -164,65 +95,6 @@ func WaitForPodReady(c clientset.Interface, set *appsv1.StatefulSet, podName str
return set, pods
}
// WaitForPodNotReady waits for the Pod named podName in set to exist and to not have a Ready condition.
func WaitForPodNotReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
for i := range pods.Items {
if pods.Items[i].Name == podName {
return !podutil.IsPodReady(&pods.Items[i]), nil
}
}
return false, nil
})
return set, pods
}
// WaitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
// complete. set must have a RollingUpdateStatefulSetStrategyType.
func WaitForRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
framework.Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
if len(pods.Items) < int(*set.Spec.Replicas) {
return false, nil
}
if set.Status.UpdateRevision != set.Status.CurrentRevision {
framework.Logf("Waiting for StatefulSet %s/%s to complete update",
set.Namespace,
set.Name,
)
SortStatefulPods(pods)
for i := range pods.Items {
if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
}
}
return false, nil
}
return true, nil
})
return set, pods
}
// WaitForRunningAndNotReady waits for numStatefulPods in ss to be Running and not Ready.
func WaitForRunningAndNotReady(c clientset.Interface, numStatefulPods int32, ss *appsv1.StatefulSet) {
WaitForRunning(c, numStatefulPods, 0, ss)
}
// WaitForStatusReadyReplicas waits for the ss.Status.ReadyReplicas to be equal to expectedReplicas
func WaitForStatusReadyReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)

View File

@ -21,6 +21,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/kubernetes/test/e2e/framework"
@ -28,6 +29,23 @@ import (
"k8s.io/kubernetes/test/e2e/upgrades"
)
// createStatefulSetService creates a Headless Service with Name name and Selector set to match labels.
func createStatefulSetService(name string, labels map[string]string) *v1.Service {
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.ServiceSpec{
Selector: labels,
},
}
headlessService.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
}
headlessService.Spec.ClusterIP = "None"
return headlessService
}
// StatefulSetUpgradeTest implements an upgrade test harness for StatefulSet upgrade testing.
type StatefulSetUpgradeTest struct {
service *v1.Service
@ -61,7 +79,7 @@ func (t *StatefulSetUpgradeTest) Setup(f *framework.Framework) {
podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
ns := f.Namespace.Name
t.set = e2esset.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
t.service = e2esset.CreateStatefulSetService(ssName, labels)
t.service = createStatefulSetService(ssName, labels)
*(t.set.Spec.Replicas) = 3
e2esset.PauseNewPods(t.set)