mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
Adds StatefulSet upgrade tests and moves common functionality into the
framework package. This removes the potential for cyclic dependencies while allowing for code reuse.
This commit is contained in:
parent
6ea92b47eb
commit
4506744def
@ -32,6 +32,7 @@ import (
|
|||||||
var upgradeTests = []upgrades.Test{
|
var upgradeTests = []upgrades.Test{
|
||||||
&upgrades.ServiceUpgradeTest{},
|
&upgrades.ServiceUpgradeTest{},
|
||||||
&upgrades.SecretUpgradeTest{},
|
&upgrades.SecretUpgradeTest{},
|
||||||
|
&upgrades.StatefulSetUpgradeTest{},
|
||||||
&upgrades.DeploymentUpgradeTest{},
|
&upgrades.DeploymentUpgradeTest{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,7 +330,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
// using out of statefulset e2e as deleting pvc is a pain
|
// using out of statefulset e2e as deleting pvc is a pain
|
||||||
deleteAllStatefulSets(c, ns)
|
framework.DeleteAllStatefulSets(c, ns)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ go_library(
|
|||||||
"pods.go",
|
"pods.go",
|
||||||
"resource_usage_gatherer.go",
|
"resource_usage_gatherer.go",
|
||||||
"service_util.go",
|
"service_util.go",
|
||||||
|
"statefulset_utils.go",
|
||||||
"test_context.go",
|
"test_context.go",
|
||||||
"util.go",
|
"util.go",
|
||||||
],
|
],
|
||||||
@ -86,6 +87,7 @@ go_library(
|
|||||||
"//vendor:google.golang.org/api/compute/v1",
|
"//vendor:google.golang.org/api/compute/v1",
|
||||||
"//vendor:google.golang.org/api/googleapi",
|
"//vendor:google.golang.org/api/googleapi",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/api/resource",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1/unstructured",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1/unstructured",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/fields",
|
"//vendor:k8s.io/apimachinery/pkg/fields",
|
||||||
@ -102,6 +104,7 @@ go_library(
|
|||||||
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
|
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/validation",
|
"//vendor:k8s.io/apimachinery/pkg/util/validation",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/util/yaml",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||||
"//vendor:k8s.io/client-go/discovery",
|
"//vendor:k8s.io/client-go/discovery",
|
||||||
"//vendor:k8s.io/client-go/dynamic",
|
"//vendor:k8s.io/client-go/dynamic",
|
||||||
|
558
test/e2e/framework/statefulset_utils.go
Normal file
558
test/e2e/framework/statefulset_utils.go
Normal file
@ -0,0 +1,558 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package framework
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Poll interval for StatefulSet tests
|
||||||
|
StatefulSetPoll = 10 * time.Second
|
||||||
|
// Timeout interval for StatefulSet operations
|
||||||
|
StatefulSetTimeout = 10 * time.Minute
|
||||||
|
// Timeout for stateful pods to change state
|
||||||
|
StatefulPodTimeout = 5 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// CreateStatefulSetService creates a Headless Service with Name name and Selector set to match labels.
|
||||||
|
func CreateStatefulSetService(name string, labels map[string]string) *v1.Service {
|
||||||
|
headlessService := &v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
Selector: labels,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
headlessService.Spec.Ports = []v1.ServicePort{
|
||||||
|
{Port: 80, Name: "http", Protocol: "TCP"},
|
||||||
|
}
|
||||||
|
headlessService.Spec.ClusterIP = "None"
|
||||||
|
return headlessService
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatefulSetFromManifest returns a StatefulSet from a manifest stored in fileName in the Namespace indicated by ns.
|
||||||
|
func StatefulSetFromManifest(fileName, ns string) *apps.StatefulSet {
|
||||||
|
var ss apps.StatefulSet
|
||||||
|
Logf("Parsing statefulset from %v", fileName)
|
||||||
|
data, err := ioutil.ReadFile(fileName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
json, err := utilyaml.ToJSON(data)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &ss)).NotTo(HaveOccurred())
|
||||||
|
ss.Namespace = ns
|
||||||
|
if ss.Spec.Selector == nil {
|
||||||
|
ss.Spec.Selector = &metav1.LabelSelector{
|
||||||
|
MatchLabels: ss.Spec.Template.Labels,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &ss
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatefulSetTester is a struct that contains utility methods for testing StatefulSet related functionality. It uses a
|
||||||
|
// clientset.Interface to communicate with the API server.
|
||||||
|
type StatefulSetTester struct {
|
||||||
|
c clientset.Interface
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStatefulSetTester creates a StatefulSetTester that uses c to interact with the API server.
|
||||||
|
func NewStatefulSetTester(c clientset.Interface) *StatefulSetTester {
|
||||||
|
return &StatefulSetTester{c}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create.
|
||||||
|
func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet {
|
||||||
|
mkpath := func(file string) string {
|
||||||
|
return filepath.Join(TestContext.RepoRoot, manifestPath, file)
|
||||||
|
}
|
||||||
|
ss := StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)
|
||||||
|
|
||||||
|
Logf(fmt.Sprintf("creating " + ss.Name + " service"))
|
||||||
|
RunKubectlOrDie("create", "-f", mkpath("service.yaml"), fmt.Sprintf("--namespace=%v", ns))
|
||||||
|
|
||||||
|
Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
|
||||||
|
RunKubectlOrDie("create", "-f", mkpath("statefulset.yaml"), fmt.Sprintf("--namespace=%v", ns))
|
||||||
|
s.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||||
|
return ss
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckMount checks that the mount at mountPath is valid for all Pods in ss.
|
||||||
|
func (s *StatefulSetTester) CheckMount(ss *apps.StatefulSet, mountPath string) error {
|
||||||
|
for _, cmd := range []string{
|
||||||
|
// Print inode, size etc
|
||||||
|
fmt.Sprintf("ls -idlh %v", mountPath),
|
||||||
|
// Print subdirs
|
||||||
|
fmt.Sprintf("find %v", mountPath),
|
||||||
|
// Try writing
|
||||||
|
fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
|
||||||
|
} {
|
||||||
|
if err := s.ExecInStatefulPods(ss, cmd); err != nil {
|
||||||
|
return fmt.Errorf("failed to execute %v, error: %v", cmd, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods.
|
||||||
|
func (s *StatefulSetTester) ExecInStatefulPods(ss *apps.StatefulSet, cmd string) error {
|
||||||
|
podList := s.GetPodList(ss)
|
||||||
|
for _, statefulPod := range podList.Items {
|
||||||
|
stdout, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
|
||||||
|
Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed.
|
||||||
|
func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error {
|
||||||
|
cmd := "printf $(hostname)"
|
||||||
|
podList := s.GetPodList(ss)
|
||||||
|
for _, statefulPod := range podList.Items {
|
||||||
|
hostname, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if hostname != statefulPod.Name {
|
||||||
|
return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Saturate waits for all Pods in ss to become Running and Ready.
|
||||||
|
func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) {
|
||||||
|
var i int32
|
||||||
|
for i = 0; i < *(ss.Spec.Replicas); i++ {
|
||||||
|
Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
|
||||||
|
s.WaitForRunningAndReady(i+1, ss)
|
||||||
|
Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy")
|
||||||
|
s.SetHealthy(ss)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteStatefulPodAtIndex deletes the Pod with ordinal index in ss.
|
||||||
|
func (s *StatefulSetTester) DeleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) {
|
||||||
|
name := getStatefulSetPodNameAtIndex(index, ss)
|
||||||
|
noGrace := int64(0)
|
||||||
|
if err := s.c.Core().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
|
||||||
|
Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// VerifyStatefulPodFunc is a func that examines a StatefulSetPod.
|
||||||
|
type VerifyStatefulPodFunc func(*v1.Pod)
|
||||||
|
|
||||||
|
// VerifyPodAtIndex applies a visitor patter to the Pod at index in ss. verify is is applied to the Pod to "visit" it.
|
||||||
|
func (s *StatefulSetTester) VerifyPodAtIndex(index int, ss *apps.StatefulSet, verify VerifyStatefulPodFunc) {
|
||||||
|
name := getStatefulSetPodNameAtIndex(index, ss)
|
||||||
|
pod, err := s.c.Core().Pods(ss.Namespace).Get(name, metav1.GetOptions{})
|
||||||
|
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name))
|
||||||
|
verify(pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStatefulSetPodNameAtIndex(index int, ss *apps.StatefulSet) string {
|
||||||
|
// TODO: we won't use "-index" as the name strategy forever,
|
||||||
|
// pull the name out from an identity mapper.
|
||||||
|
return fmt.Sprintf("%v-%v", ss.Name, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scale scales ss to count replicas.
|
||||||
|
func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) error {
|
||||||
|
name := ss.Name
|
||||||
|
ns := ss.Namespace
|
||||||
|
s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })
|
||||||
|
|
||||||
|
var statefulPodList *v1.PodList
|
||||||
|
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
|
||||||
|
statefulPodList = s.GetPodList(ss)
|
||||||
|
if int32(len(statefulPodList.Items)) == count {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if pollErr != nil {
|
||||||
|
unhealthy := []string{}
|
||||||
|
for _, statefulPod := range statefulPodList.Items {
|
||||||
|
delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, v1.IsPodReady(&statefulPod)
|
||||||
|
if delTs != nil || phase != v1.PodRunning || !readiness {
|
||||||
|
unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateReplicas updates the replicas of ss to count.
|
||||||
|
func (s *StatefulSetTester) UpdateReplicas(ss *apps.StatefulSet, count int32) {
|
||||||
|
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { ss.Spec.Replicas = &count })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart scales ss to 0 and then back to its previous number of replicas.
|
||||||
|
func (s *StatefulSetTester) Restart(ss *apps.StatefulSet) {
|
||||||
|
oldReplicas := *(ss.Spec.Replicas)
|
||||||
|
ExpectNoError(s.Scale(ss, 0))
|
||||||
|
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) {
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
Failf("failed to get statefulset %q: %v", name, err)
|
||||||
|
}
|
||||||
|
update(ss)
|
||||||
|
ss, err = s.c.Apps().StatefulSets(ns).Update(ss)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
|
||||||
|
Failf("failed to update statefulset %q: %v", name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Failf("too many retries draining statefulset %q", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPodList gets the current Pods in ss.
|
||||||
|
func (s *StatefulSetTester) GetPodList(ss *apps.StatefulSet) *v1.PodList {
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
|
||||||
|
ExpectNoError(err)
|
||||||
|
podList, err := s.c.Core().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
|
||||||
|
ExpectNoError(err)
|
||||||
|
return podList
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfirmStatefulPodCount asserts that the current number of Pods in ss is count waiting up to timeout for ss to
|
||||||
|
// to scale to count.
|
||||||
|
func (s *StatefulSetTester) ConfirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration) {
|
||||||
|
start := time.Now()
|
||||||
|
deadline := start.Add(timeout)
|
||||||
|
for t := time.Now(); t.Before(deadline); t = time.Now() {
|
||||||
|
podList := s.GetPodList(ss)
|
||||||
|
statefulPodCount := len(podList.Items)
|
||||||
|
if statefulPodCount != count {
|
||||||
|
Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ss.Name, count, len(podList.Items), podList)
|
||||||
|
}
|
||||||
|
Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t))
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StatefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) {
|
||||||
|
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
|
||||||
|
func() (bool, error) {
|
||||||
|
podList := s.GetPodList(ss)
|
||||||
|
if int32(len(podList.Items)) < numStatefulPods {
|
||||||
|
Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if int32(len(podList.Items)) > numStatefulPods {
|
||||||
|
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items))
|
||||||
|
}
|
||||||
|
for _, p := range podList.Items {
|
||||||
|
isReady := v1.IsPodReady(&p)
|
||||||
|
desiredReadiness := shouldBeReady == isReady
|
||||||
|
Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
|
||||||
|
if p.Status.Phase != v1.PodRunning || !desiredReadiness {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if pollErr != nil {
|
||||||
|
Failf("Failed waiting for pods to enter running: %v", pollErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
|
||||||
|
func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
|
||||||
|
s.waitForRunning(numStatefulPods, ss, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and not Ready.
|
||||||
|
func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
|
||||||
|
s.waitForRunning(numStatefulPods, ss, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BreakProbe breaks the readiness probe for Nginx StatefulSet containers.
|
||||||
|
func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
|
||||||
|
path := probe.HTTPGet.Path
|
||||||
|
if path == "" {
|
||||||
|
return fmt.Errorf("Path expected to be not empty: %v", path)
|
||||||
|
}
|
||||||
|
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
|
||||||
|
return s.ExecInStatefulPods(ss, cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RestoreProbe restores the readiness probe for Nginx StatefulSet containers.
|
||||||
|
func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
|
||||||
|
path := probe.HTTPGet.Path
|
||||||
|
if path == "" {
|
||||||
|
return fmt.Errorf("Path expected to be not empty: %v", path)
|
||||||
|
}
|
||||||
|
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
|
||||||
|
return s.ExecInStatefulPods(ss, cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetHealthy updates the StatefulSet InitAnnotation to true in order to set a StatefulSet Pod to be Running and Ready.
|
||||||
|
func (s *StatefulSetTester) SetHealthy(ss *apps.StatefulSet) {
|
||||||
|
podList := s.GetPodList(ss)
|
||||||
|
markedHealthyPod := ""
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
if pod.Status.Phase != v1.PodRunning {
|
||||||
|
Failf("Found pod in %v cannot set health", pod.Status.Phase)
|
||||||
|
}
|
||||||
|
if IsStatefulSetPodInitialized(pod) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if markedHealthyPod != "" {
|
||||||
|
Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod)
|
||||||
|
}
|
||||||
|
p, err := UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) {
|
||||||
|
update.Annotations[apps.StatefulSetInitAnnotation] = "true"
|
||||||
|
})
|
||||||
|
ExpectNoError(err)
|
||||||
|
Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name)
|
||||||
|
markedHealthyPod = pod.Name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StatefulSetTester) waitForStatus(ss *apps.StatefulSet, expectedReplicas int32) {
|
||||||
|
Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
|
||||||
|
|
||||||
|
ns, name := ss.Namespace, ss.Name
|
||||||
|
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
|
||||||
|
func() (bool, error) {
|
||||||
|
ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if ssGet.Status.Replicas != expectedReplicas {
|
||||||
|
Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if pollErr != nil {
|
||||||
|
Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckServiceName asserts that the ServiceName for ss is equivalent to expectedServiceName.
|
||||||
|
func (p *StatefulSetTester) CheckServiceName(ss *apps.StatefulSet, expectedServiceName string) error {
|
||||||
|
Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
|
||||||
|
|
||||||
|
if expectedServiceName != ss.Spec.ServiceName {
|
||||||
|
return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s",
|
||||||
|
expectedServiceName, ss.Spec.ServiceName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns.
|
||||||
|
func DeleteAllStatefulSets(c clientset.Interface, ns string) {
|
||||||
|
sst := &StatefulSetTester{c: c}
|
||||||
|
ssList, err := c.Apps().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
|
||||||
|
ExpectNoError(err)
|
||||||
|
|
||||||
|
// Scale down each statefulset, then delete it completely.
|
||||||
|
// Deleting a pvc without doing this will leak volumes, #25101.
|
||||||
|
errList := []string{}
|
||||||
|
for _, ss := range ssList.Items {
|
||||||
|
Logf("Scaling statefulset %v to 0", ss.Name)
|
||||||
|
if err := sst.Scale(&ss, 0); err != nil {
|
||||||
|
errList = append(errList, fmt.Sprintf("%v", err))
|
||||||
|
}
|
||||||
|
sst.waitForStatus(&ss, 0)
|
||||||
|
Logf("Deleting statefulset %v", ss.Name)
|
||||||
|
if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil {
|
||||||
|
errList = append(errList, fmt.Sprintf("%v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
|
||||||
|
pvNames := sets.NewString()
|
||||||
|
// TODO: Don't assume all pvcs in the ns belong to a statefulset
|
||||||
|
pvcPollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
|
||||||
|
pvcList, err := c.Core().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
|
||||||
|
if err != nil {
|
||||||
|
Logf("WARNING: Failed to list pvcs, retrying %v", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
for _, pvc := range pvcList.Items {
|
||||||
|
pvNames.Insert(pvc.Spec.VolumeName)
|
||||||
|
// TODO: Double check that there are no pods referencing the pvc
|
||||||
|
Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
|
||||||
|
if err := c.Core().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if pvcPollErr != nil {
|
||||||
|
errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion."))
|
||||||
|
}
|
||||||
|
|
||||||
|
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
|
||||||
|
pvList, err := c.Core().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
|
||||||
|
if err != nil {
|
||||||
|
Logf("WARNING: Failed to list pvs, retrying %v", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
waitingFor := []string{}
|
||||||
|
for _, pv := range pvList.Items {
|
||||||
|
if pvNames.Has(pv.Name) {
|
||||||
|
waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(waitingFor) == 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if pollErr != nil {
|
||||||
|
errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs."))
|
||||||
|
}
|
||||||
|
if len(errList) != 0 {
|
||||||
|
ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsStatefulSetPodInitialized returns true if pod's StatefulSetInitAnnotation exists and is set to true.
|
||||||
|
func IsStatefulSetPodInitialized(pod v1.Pod) bool {
|
||||||
|
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
inited, err := strconv.ParseBool(initialized)
|
||||||
|
if err != nil {
|
||||||
|
Failf("Couldn't parse statefulset init annotations %v", initialized)
|
||||||
|
}
|
||||||
|
return inited
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
|
||||||
|
func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
|
||||||
|
return v1.PersistentVolumeClaim{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"volume.alpha.kubernetes.io/storage-class": "anything",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{
|
||||||
|
AccessModes: []v1.PersistentVolumeAccessMode{
|
||||||
|
v1.ReadWriteOnce,
|
||||||
|
},
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStatefulSet creates a new NGINX StatefulSet for testing. The StatefulSet is named name, is in namespace ns,
|
||||||
|
// statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly
|
||||||
|
// to the Pod. labels are the labels that will be usd for the StatefulSet selector.
|
||||||
|
func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet {
|
||||||
|
mounts := append(statefulPodMounts, podMounts...)
|
||||||
|
claims := []v1.PersistentVolumeClaim{}
|
||||||
|
for _, m := range statefulPodMounts {
|
||||||
|
claims = append(claims, NewStatefulSetPVC(m.Name))
|
||||||
|
}
|
||||||
|
|
||||||
|
vols := []v1.Volume{}
|
||||||
|
for _, m := range podMounts {
|
||||||
|
vols = append(vols, v1.Volume{
|
||||||
|
Name: m.Name,
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
HostPath: &v1.HostPathVolumeSource{
|
||||||
|
Path: fmt.Sprintf("/tmp/%v", m.Name),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return &apps.StatefulSet{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
Kind: "StatefulSet",
|
||||||
|
APIVersion: "apps/v1beta1",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
Spec: apps.StatefulSetSpec{
|
||||||
|
Selector: &metav1.LabelSelector{
|
||||||
|
MatchLabels: labels,
|
||||||
|
},
|
||||||
|
Replicas: func(i int32) *int32 { return &i }(replicas),
|
||||||
|
Template: v1.PodTemplateSpec{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: labels,
|
||||||
|
Annotations: map[string]string{},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "nginx",
|
||||||
|
Image: "gcr.io/google_containers/nginx-slim:0.7",
|
||||||
|
VolumeMounts: mounts,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Volumes: vols,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
VolumeClaimTemplates: claims,
|
||||||
|
ServiceName: governingSvcName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetStatefulSetInitializedAnnotation sets teh StatefulSetInitAnnotation to value.
|
||||||
|
func SetStatefulSetInitializedAnnotation(ss *apps.StatefulSet, value string) {
|
||||||
|
ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value
|
||||||
|
}
|
@ -382,17 +382,17 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
|
|||||||
dumpDebugInfo(c, ns)
|
dumpDebugInfo(c, ns)
|
||||||
}
|
}
|
||||||
framework.Logf("Deleting all stateful set in ns %v", ns)
|
framework.Logf("Deleting all stateful set in ns %v", ns)
|
||||||
deleteAllStatefulSets(c, ns)
|
framework.DeleteAllStatefulSets(c, ns)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should come back up if node goes down [Slow] [Disruptive]", func() {
|
It("should come back up if node goes down [Slow] [Disruptive]", func() {
|
||||||
petMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
|
petMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
|
||||||
podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
|
podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
|
||||||
ps := newStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
|
ps := framework.NewStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
|
||||||
_, err := c.Apps().StatefulSets(ns).Create(ps)
|
_, err := c.Apps().StatefulSets(ns).Create(ps)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
pst := statefulSetTester{c: c}
|
pst := framework.NewStatefulSetTester(c)
|
||||||
|
|
||||||
nn := framework.TestContext.CloudConfig.NumNodes
|
nn := framework.TestContext.CloudConfig.NumNodes
|
||||||
nodeNames, err := framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, nn)
|
nodeNames, err := framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, nn)
|
||||||
@ -400,18 +400,18 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
|
|||||||
restartNodes(f, nodeNames)
|
restartNodes(f, nodeNames)
|
||||||
|
|
||||||
By("waiting for pods to be running again")
|
By("waiting for pods to be running again")
|
||||||
pst.waitForRunningAndReady(*ps.Spec.Replicas, ps)
|
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should not reschedule stateful pods if there is a network partition [Slow] [Disruptive]", func() {
|
It("should not reschedule stateful pods if there is a network partition [Slow] [Disruptive]", func() {
|
||||||
ps := newStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels)
|
ps := framework.NewStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels)
|
||||||
_, err := c.Apps().StatefulSets(ns).Create(ps)
|
_, err := c.Apps().StatefulSets(ns).Create(ps)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
pst := statefulSetTester{c: c}
|
pst := framework.NewStatefulSetTester(c)
|
||||||
pst.waitForRunningAndReady(*ps.Spec.Replicas, ps)
|
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)
|
||||||
|
|
||||||
pod := pst.getPodList(ps).Items[0]
|
pod := pst.GetPodList(ps).Items[0]
|
||||||
node, err := c.Core().Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{})
|
node, err := c.Core().Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{})
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
@ -430,7 +430,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
By("waiting for pods to be running again")
|
By("waiting for pods to be running again")
|
||||||
pst.waitForRunningAndReady(*ps.Spec.Replicas, ps)
|
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -18,27 +18,18 @@ package e2e
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
klabels "k8s.io/apimachinery/pkg/labels"
|
klabels "k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
@ -46,11 +37,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
statefulsetPoll = 10 * time.Second
|
|
||||||
// Some statefulPods install base packages via wget
|
|
||||||
statefulsetTimeout = 10 * time.Minute
|
|
||||||
// Timeout for stateful pods to change state
|
|
||||||
statefulPodTimeout = 5 * time.Minute
|
|
||||||
zookeeperManifestPath = "test/e2e/testing-manifests/statefulset/zookeeper"
|
zookeeperManifestPath = "test/e2e/testing-manifests/statefulset/zookeeper"
|
||||||
mysqlGaleraManifestPath = "test/e2e/testing-manifests/statefulset/mysql-galera"
|
mysqlGaleraManifestPath = "test/e2e/testing-manifests/statefulset/mysql-galera"
|
||||||
redisManifestPath = "test/e2e/testing-manifests/statefulset/redis"
|
redisManifestPath = "test/e2e/testing-manifests/statefulset/redis"
|
||||||
@ -87,7 +73,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
|
statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
|
||||||
podMounts = []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
|
podMounts = []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
|
||||||
ss = newStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
|
ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
|
||||||
|
|
||||||
By("Creating service " + headlessSvcName + " in namespace " + ns)
|
By("Creating service " + headlessSvcName + " in namespace " + ns)
|
||||||
headlessService := createServiceSpec(headlessSvcName, "", true, labels)
|
headlessService := createServiceSpec(headlessSvcName, "", true, labels)
|
||||||
@ -100,80 +86,80 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
dumpDebugInfo(c, ns)
|
dumpDebugInfo(c, ns)
|
||||||
}
|
}
|
||||||
framework.Logf("Deleting all statefulset in ns %v", ns)
|
framework.Logf("Deleting all statefulset in ns %v", ns)
|
||||||
deleteAllStatefulSets(c, ns)
|
framework.DeleteAllStatefulSets(c, ns)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should provide basic identity", func() {
|
It("should provide basic identity", func() {
|
||||||
By("Creating statefulset " + ssName + " in namespace " + ns)
|
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||||
*(ss.Spec.Replicas) = 3
|
*(ss.Spec.Replicas) = 3
|
||||||
setInitializedAnnotation(ss, "false")
|
framework.SetStatefulSetInitializedAnnotation(ss, "false")
|
||||||
|
|
||||||
_, err := c.Apps().StatefulSets(ns).Create(ss)
|
_, err := c.Apps().StatefulSets(ns).Create(ss)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sst := statefulSetTester{c: c}
|
sst := framework.NewStatefulSetTester(c)
|
||||||
|
|
||||||
By("Saturating stateful set " + ss.Name)
|
By("Saturating stateful set " + ss.Name)
|
||||||
sst.saturate(ss)
|
sst.Saturate(ss)
|
||||||
|
|
||||||
By("Verifying statefulset mounted data directory is usable")
|
By("Verifying statefulset mounted data directory is usable")
|
||||||
framework.ExpectNoError(sst.checkMount(ss, "/data"))
|
framework.ExpectNoError(sst.CheckMount(ss, "/data"))
|
||||||
|
|
||||||
By("Verifying statefulset provides a stable hostname for each pod")
|
By("Verifying statefulset provides a stable hostname for each pod")
|
||||||
framework.ExpectNoError(sst.checkHostname(ss))
|
framework.ExpectNoError(sst.CheckHostname(ss))
|
||||||
|
|
||||||
By("Verifying statefulset set proper service name")
|
By("Verifying statefulset set proper service name")
|
||||||
framework.ExpectNoError(sst.checkServiceName(ss, headlessSvcName))
|
framework.ExpectNoError(sst.CheckServiceName(ss, headlessSvcName))
|
||||||
|
|
||||||
cmd := "echo $(hostname) > /data/hostname; sync;"
|
cmd := "echo $(hostname) > /data/hostname; sync;"
|
||||||
By("Running " + cmd + " in all stateful pods")
|
By("Running " + cmd + " in all stateful pods")
|
||||||
framework.ExpectNoError(sst.execInStatefulPods(ss, cmd))
|
framework.ExpectNoError(sst.ExecInStatefulPods(ss, cmd))
|
||||||
|
|
||||||
By("Restarting statefulset " + ss.Name)
|
By("Restarting statefulset " + ss.Name)
|
||||||
sst.restart(ss)
|
sst.Restart(ss)
|
||||||
sst.saturate(ss)
|
sst.Saturate(ss)
|
||||||
|
|
||||||
By("Verifying statefulset mounted data directory is usable")
|
By("Verifying statefulset mounted data directory is usable")
|
||||||
framework.ExpectNoError(sst.checkMount(ss, "/data"))
|
framework.ExpectNoError(sst.CheckMount(ss, "/data"))
|
||||||
|
|
||||||
cmd = "if [ \"$(cat /data/hostname)\" = \"$(hostname)\" ]; then exit 0; else exit 1; fi"
|
cmd = "if [ \"$(cat /data/hostname)\" = \"$(hostname)\" ]; then exit 0; else exit 1; fi"
|
||||||
By("Running " + cmd + " in all stateful pods")
|
By("Running " + cmd + " in all stateful pods")
|
||||||
framework.ExpectNoError(sst.execInStatefulPods(ss, cmd))
|
framework.ExpectNoError(sst.ExecInStatefulPods(ss, cmd))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should not deadlock when a pod's predecessor fails", func() {
|
It("should not deadlock when a pod's predecessor fails", func() {
|
||||||
By("Creating statefulset " + ssName + " in namespace " + ns)
|
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||||
*(ss.Spec.Replicas) = 2
|
*(ss.Spec.Replicas) = 2
|
||||||
setInitializedAnnotation(ss, "false")
|
framework.SetStatefulSetInitializedAnnotation(ss, "false")
|
||||||
|
|
||||||
_, err := c.Apps().StatefulSets(ns).Create(ss)
|
_, err := c.Apps().StatefulSets(ns).Create(ss)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sst := statefulSetTester{c: c}
|
sst := framework.NewStatefulSetTester(c)
|
||||||
|
|
||||||
sst.waitForRunningAndReady(1, ss)
|
sst.WaitForRunningAndReady(1, ss)
|
||||||
|
|
||||||
By("Marking stateful pod at index 0 as healthy.")
|
By("Marking stateful pod at index 0 as healthy.")
|
||||||
sst.setHealthy(ss)
|
sst.SetHealthy(ss)
|
||||||
|
|
||||||
By("Waiting for stateful pod at index 1 to enter running.")
|
By("Waiting for stateful pod at index 1 to enter running.")
|
||||||
sst.waitForRunningAndReady(2, ss)
|
sst.WaitForRunningAndReady(2, ss)
|
||||||
|
|
||||||
// Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not*
|
// Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not*
|
||||||
// create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till
|
// create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till
|
||||||
// we set the healthy bit.
|
// we set the healthy bit.
|
||||||
|
|
||||||
By("Deleting healthy stateful pod at index 0.")
|
By("Deleting healthy stateful pod at index 0.")
|
||||||
sst.deleteStatefulPodAtIndex(0, ss)
|
sst.DeleteStatefulPodAtIndex(0, ss)
|
||||||
|
|
||||||
By("Confirming stateful pod at index 0 is recreated.")
|
By("Confirming stateful pod at index 0 is recreated.")
|
||||||
sst.waitForRunningAndReady(2, ss)
|
sst.WaitForRunningAndReady(2, ss)
|
||||||
|
|
||||||
By("Deleting unhealthy stateful pod at index 1.")
|
By("Deleting unhealthy stateful pod at index 1.")
|
||||||
sst.deleteStatefulPodAtIndex(1, ss)
|
sst.DeleteStatefulPodAtIndex(1, ss)
|
||||||
|
|
||||||
By("Confirming all stateful pods in statefulset are created.")
|
By("Confirming all stateful pods in statefulset are created.")
|
||||||
sst.saturate(ss)
|
sst.Saturate(ss)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should allow template updates", func() {
|
It("should allow template updates", func() {
|
||||||
@ -183,9 +169,9 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
ss, err := c.Apps().StatefulSets(ns).Create(ss)
|
ss, err := c.Apps().StatefulSets(ns).Create(ss)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sst := statefulSetTester{c: c}
|
sst := framework.NewStatefulSetTester(c)
|
||||||
|
|
||||||
sst.waitForRunningAndReady(*ss.Spec.Replicas, ss)
|
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||||
|
|
||||||
newImage := newNginxImage
|
newImage := newNginxImage
|
||||||
oldImage := ss.Spec.Template.Spec.Containers[0].Image
|
oldImage := ss.Spec.Template.Spec.Containers[0].Image
|
||||||
@ -198,17 +184,17 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
|
|
||||||
updateIndex := 0
|
updateIndex := 0
|
||||||
By(fmt.Sprintf("Deleting stateful pod at index %d", updateIndex))
|
By(fmt.Sprintf("Deleting stateful pod at index %d", updateIndex))
|
||||||
sst.deleteStatefulPodAtIndex(updateIndex, ss)
|
sst.DeleteStatefulPodAtIndex(updateIndex, ss)
|
||||||
|
|
||||||
By("Waiting for all stateful pods to be running again")
|
By("Waiting for all stateful pods to be running again")
|
||||||
sst.waitForRunningAndReady(*ss.Spec.Replicas, ss)
|
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||||
|
|
||||||
By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex))
|
By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex))
|
||||||
verify := func(pod *v1.Pod) {
|
verify := func(pod *v1.Pod) {
|
||||||
podImage := pod.Spec.Containers[0].Image
|
podImage := pod.Spec.Containers[0].Image
|
||||||
Expect(podImage).To(Equal(newImage), fmt.Sprintf("Expected stateful pod image %s updated to %s", podImage, newImage))
|
Expect(podImage).To(Equal(newImage), fmt.Sprintf("Expected stateful pod image %s updated to %s", podImage, newImage))
|
||||||
}
|
}
|
||||||
sst.verifyPodAtIndex(updateIndex, ss, verify)
|
sst.VerifyPodAtIndex(updateIndex, ss, verify)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed", func() {
|
It("Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed", func() {
|
||||||
@ -216,29 +202,29 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||||
Path: "/index.html",
|
Path: "/index.html",
|
||||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||||
ss := newStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels)
|
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels)
|
||||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||||
setInitializedAnnotation(ss, "false")
|
framework.SetStatefulSetInitializedAnnotation(ss, "false")
|
||||||
ss, err := c.Apps().StatefulSets(ns).Create(ss)
|
ss, err := c.Apps().StatefulSets(ns).Create(ss)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
sst := &statefulSetTester{c: c}
|
sst := framework.NewStatefulSetTester(c)
|
||||||
sst.waitForRunningAndReady(1, ss)
|
sst.WaitForRunningAndReady(1, ss)
|
||||||
|
|
||||||
By("Scaling up stateful set " + ssName + " to 3 replicas and pausing after 2nd pod")
|
By("Scaling up stateful set " + ssName + " to 3 replicas and pausing after 2nd pod")
|
||||||
sst.setHealthy(ss)
|
sst.SetHealthy(ss)
|
||||||
sst.updateReplicas(ss, 3)
|
sst.UpdateReplicas(ss, 3)
|
||||||
sst.waitForRunningAndReady(2, ss)
|
sst.WaitForRunningAndReady(2, ss)
|
||||||
|
|
||||||
By("Before scale up finished setting 2nd pod to be not ready by breaking readiness probe")
|
By("Before scale up finished setting 2nd pod to be not ready by breaking readiness probe")
|
||||||
sst.breakProbe(ss, testProbe)
|
sst.BreakProbe(ss, testProbe)
|
||||||
sst.waitForRunningAndNotReady(2, ss)
|
sst.WaitForRunningAndNotReady(2, ss)
|
||||||
|
|
||||||
By("Continue scale operation after the 2nd pod, and scaling down to 1 replica")
|
By("Continue scale operation after the 2nd pod, and scaling down to 1 replica")
|
||||||
sst.setHealthy(ss)
|
sst.SetHealthy(ss)
|
||||||
sst.updateReplicas(ss, 1)
|
sst.UpdateReplicas(ss, 1)
|
||||||
|
|
||||||
By("Verifying that the 2nd pod wont be removed if it is not running and ready")
|
By("Verifying that the 2nd pod wont be removed if it is not running and ready")
|
||||||
sst.confirmStatefulPodCount(2, ss, 10*time.Second)
|
sst.ConfirmStatefulPodCount(2, ss, 10*time.Second)
|
||||||
expectedPodName := ss.Name + "-1"
|
expectedPodName := ss.Name + "-1"
|
||||||
expectedPod, err := f.ClientSet.Core().Pods(ns).Get(expectedPodName, metav1.GetOptions{})
|
expectedPod, err := f.ClientSet.Core().Pods(ns).Get(expectedPodName, metav1.GetOptions{})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
@ -251,8 +237,8 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Verifying the 2nd pod is removed only when it becomes running and ready")
|
By("Verifying the 2nd pod is removed only when it becomes running and ready")
|
||||||
sst.restoreProbe(ss, testProbe)
|
sst.RestoreProbe(ss, testProbe)
|
||||||
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
|
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
|
||||||
pod := event.Object.(*v1.Pod)
|
pod := event.Object.(*v1.Pod)
|
||||||
if event.Type == watch.Deleted && pod.Name == expectedPodName {
|
if event.Type == watch.Deleted && pod.Name == expectedPodName {
|
||||||
return false, fmt.Errorf("Pod %v was deleted before enter running", pod.Name)
|
return false, fmt.Errorf("Pod %v was deleted before enter running", pod.Name)
|
||||||
@ -282,28 +268,28 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||||
Path: "/index.html",
|
Path: "/index.html",
|
||||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||||
ss := newStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
|
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
|
||||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||||
ss, err = c.Apps().StatefulSets(ns).Create(ss)
|
ss, err = c.Apps().StatefulSets(ns).Create(ss)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
|
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
|
||||||
sst := &statefulSetTester{c: c}
|
sst := framework.NewStatefulSetTester(c)
|
||||||
sst.waitForRunningAndReady(*ss.Spec.Replicas, ss)
|
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||||
|
|
||||||
By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
|
By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
|
||||||
sst.breakProbe(ss, testProbe)
|
sst.BreakProbe(ss, testProbe)
|
||||||
sst.waitForRunningAndNotReady(*ss.Spec.Replicas, ss)
|
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
|
||||||
sst.updateReplicas(ss, 3)
|
sst.UpdateReplicas(ss, 3)
|
||||||
sst.confirmStatefulPodCount(1, ss, 10*time.Second)
|
sst.ConfirmStatefulPodCount(1, ss, 10*time.Second)
|
||||||
|
|
||||||
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
|
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
|
||||||
sst.restoreProbe(ss, testProbe)
|
sst.RestoreProbe(ss, testProbe)
|
||||||
sst.waitForRunningAndReady(3, ss)
|
sst.WaitForRunningAndReady(3, ss)
|
||||||
|
|
||||||
By("Verifying that stateful set " + ssName + " was scaled up in order")
|
By("Verifying that stateful set " + ssName + " was scaled up in order")
|
||||||
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
|
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
|
||||||
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
|
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
|
||||||
if event.Type != watch.Added {
|
if event.Type != watch.Added {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -322,18 +308,18 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sst.breakProbe(ss, testProbe)
|
sst.BreakProbe(ss, testProbe)
|
||||||
sst.waitForRunningAndNotReady(3, ss)
|
sst.WaitForRunningAndNotReady(3, ss)
|
||||||
sst.updateReplicas(ss, 0)
|
sst.UpdateReplicas(ss, 0)
|
||||||
sst.confirmStatefulPodCount(3, ss, 10*time.Second)
|
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second)
|
||||||
|
|
||||||
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
|
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
|
||||||
sst.restoreProbe(ss, testProbe)
|
sst.RestoreProbe(ss, testProbe)
|
||||||
sst.scale(ss, 0)
|
sst.Scale(ss, 0)
|
||||||
|
|
||||||
By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
|
By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
|
||||||
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
|
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
|
||||||
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
|
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
|
||||||
if event.Type != watch.Deleted {
|
if event.Type != watch.Deleted {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -375,7 +361,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
By("Creating statefulset with conflicting port in namespace " + f.Namespace.Name)
|
By("Creating statefulset with conflicting port in namespace " + f.Namespace.Name)
|
||||||
ss := newStatefulSet(ssName, f.Namespace.Name, headlessSvcName, 1, nil, nil, labels)
|
ss := framework.NewStatefulSet(ssName, f.Namespace.Name, headlessSvcName, 1, nil, nil, labels)
|
||||||
statefulPodContainer := &ss.Spec.Template.Spec.Containers[0]
|
statefulPodContainer := &ss.Spec.Template.Spec.Containers[0]
|
||||||
statefulPodContainer.Ports = append(statefulPodContainer.Ports, conflictingPort)
|
statefulPodContainer.Ports = append(statefulPodContainer.Ports, conflictingPort)
|
||||||
ss.Spec.Template.Spec.NodeName = node.Name
|
ss.Spec.Template.Spec.NodeName = node.Name
|
||||||
@ -392,7 +378,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
w, err := f.ClientSet.Core().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
|
w, err := f.ClientSet.Core().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
// we need to get UID from pod in any state and wait until stateful set controller will remove pod atleast once
|
// we need to get UID from pod in any state and wait until stateful set controller will remove pod atleast once
|
||||||
_, err = watch.Until(statefulPodTimeout, w, func(event watch.Event) (bool, error) {
|
_, err = watch.Until(framework.StatefulPodTimeout, w, func(event watch.Event) (bool, error) {
|
||||||
pod := event.Object.(*v1.Pod)
|
pod := event.Object.(*v1.Pod)
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
@ -428,16 +414,16 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
return fmt.Errorf("Pod %v wasn't recreated: %v == %v", statefulPod.Name, statefulPod.UID, initialStatefulPodUID)
|
return fmt.Errorf("Pod %v wasn't recreated: %v == %v", statefulPod.Name, statefulPod.UID, initialStatefulPodUID)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}, statefulPodTimeout, 2*time.Second).Should(BeNil())
|
}, framework.StatefulPodTimeout, 2*time.Second).Should(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
framework.KubeDescribe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() {
|
framework.KubeDescribe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() {
|
||||||
var sst *statefulSetTester
|
var sst *framework.StatefulSetTester
|
||||||
var appTester *clusterAppTester
|
var appTester *clusterAppTester
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
sst = &statefulSetTester{c: c}
|
sst = framework.NewStatefulSetTester(c)
|
||||||
appTester = &clusterAppTester{tester: sst, ns: ns}
|
appTester = &clusterAppTester{tester: sst, ns: ns}
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -446,7 +432,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
|
|||||||
dumpDebugInfo(c, ns)
|
dumpDebugInfo(c, ns)
|
||||||
}
|
}
|
||||||
framework.Logf("Deleting all statefulset in ns %v", ns)
|
framework.Logf("Deleting all statefulset in ns %v", ns)
|
||||||
deleteAllStatefulSets(c, ns)
|
framework.DeleteAllStatefulSets(c, ns)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should creating a working zookeeper cluster", func() {
|
It("should creating a working zookeeper cluster", func() {
|
||||||
@ -504,7 +490,7 @@ type statefulPodTester interface {
|
|||||||
type clusterAppTester struct {
|
type clusterAppTester struct {
|
||||||
ns string
|
ns string
|
||||||
statefulPod statefulPodTester
|
statefulPod statefulPodTester
|
||||||
tester *statefulSetTester
|
tester *framework.StatefulSetTester
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterAppTester) run() {
|
func (c *clusterAppTester) run() {
|
||||||
@ -520,8 +506,8 @@ func (c *clusterAppTester) run() {
|
|||||||
default:
|
default:
|
||||||
if restartCluster {
|
if restartCluster {
|
||||||
By("Restarting stateful set " + ss.Name)
|
By("Restarting stateful set " + ss.Name)
|
||||||
c.tester.restart(ss)
|
c.tester.Restart(ss)
|
||||||
c.tester.waitForRunningAndReady(*ss.Spec.Replicas, ss)
|
c.tester.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -533,7 +519,7 @@ func (c *clusterAppTester) run() {
|
|||||||
|
|
||||||
type zookeeperTester struct {
|
type zookeeperTester struct {
|
||||||
ss *apps.StatefulSet
|
ss *apps.StatefulSet
|
||||||
tester *statefulSetTester
|
tester *framework.StatefulSetTester
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *zookeeperTester) name() string {
|
func (z *zookeeperTester) name() string {
|
||||||
@ -541,7 +527,7 @@ func (z *zookeeperTester) name() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *zookeeperTester) deploy(ns string) *apps.StatefulSet {
|
func (z *zookeeperTester) deploy(ns string) *apps.StatefulSet {
|
||||||
z.ss = z.tester.createStatefulSet(zookeeperManifestPath, ns)
|
z.ss = z.tester.CreateStatefulSet(zookeeperManifestPath, ns)
|
||||||
return z.ss
|
return z.ss
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -563,7 +549,7 @@ func (z *zookeeperTester) read(statefulPodIndex int, key string) string {
|
|||||||
|
|
||||||
type mysqlGaleraTester struct {
|
type mysqlGaleraTester struct {
|
||||||
ss *apps.StatefulSet
|
ss *apps.StatefulSet
|
||||||
tester *statefulSetTester
|
tester *framework.StatefulSetTester
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mysqlGaleraTester) name() string {
|
func (m *mysqlGaleraTester) name() string {
|
||||||
@ -579,7 +565,7 @@ func (m *mysqlGaleraTester) mysqlExec(cmd, ns, podName string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *mysqlGaleraTester) deploy(ns string) *apps.StatefulSet {
|
func (m *mysqlGaleraTester) deploy(ns string) *apps.StatefulSet {
|
||||||
m.ss = m.tester.createStatefulSet(mysqlGaleraManifestPath, ns)
|
m.ss = m.tester.CreateStatefulSet(mysqlGaleraManifestPath, ns)
|
||||||
|
|
||||||
framework.Logf("Deployed statefulset %v, initializing database", m.ss.Name)
|
framework.Logf("Deployed statefulset %v, initializing database", m.ss.Name)
|
||||||
for _, cmd := range []string{
|
for _, cmd := range []string{
|
||||||
@ -606,7 +592,7 @@ func (m *mysqlGaleraTester) read(statefulPodIndex int, key string) string {
|
|||||||
|
|
||||||
type redisTester struct {
|
type redisTester struct {
|
||||||
ss *apps.StatefulSet
|
ss *apps.StatefulSet
|
||||||
tester *statefulSetTester
|
tester *framework.StatefulSetTester
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *redisTester) name() string {
|
func (m *redisTester) name() string {
|
||||||
@ -619,7 +605,7 @@ func (m *redisTester) redisExec(cmd, ns, podName string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *redisTester) deploy(ns string) *apps.StatefulSet {
|
func (m *redisTester) deploy(ns string) *apps.StatefulSet {
|
||||||
m.ss = m.tester.createStatefulSet(redisManifestPath, ns)
|
m.ss = m.tester.CreateStatefulSet(redisManifestPath, ns)
|
||||||
return m.ss
|
return m.ss
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -637,7 +623,7 @@ func (m *redisTester) read(statefulPodIndex int, key string) string {
|
|||||||
|
|
||||||
type cockroachDBTester struct {
|
type cockroachDBTester struct {
|
||||||
ss *apps.StatefulSet
|
ss *apps.StatefulSet
|
||||||
tester *statefulSetTester
|
tester *framework.StatefulSetTester
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cockroachDBTester) name() string {
|
func (c *cockroachDBTester) name() string {
|
||||||
@ -650,7 +636,7 @@ func (c *cockroachDBTester) cockroachDBExec(cmd, ns, podName string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *cockroachDBTester) deploy(ns string) *apps.StatefulSet {
|
func (c *cockroachDBTester) deploy(ns string) *apps.StatefulSet {
|
||||||
c.ss = c.tester.createStatefulSet(cockroachDBManifestPath, ns)
|
c.ss = c.tester.CreateStatefulSet(cockroachDBManifestPath, ns)
|
||||||
framework.Logf("Deployed statefulset %v, initializing database", c.ss.Name)
|
framework.Logf("Deployed statefulset %v, initializing database", c.ss.Name)
|
||||||
for _, cmd := range []string{
|
for _, cmd := range []string{
|
||||||
"CREATE DATABASE IF NOT EXISTS foo;",
|
"CREATE DATABASE IF NOT EXISTS foo;",
|
||||||
@ -678,370 +664,6 @@ func lastLine(out string) string {
|
|||||||
return outLines[len(outLines)-1]
|
return outLines[len(outLines)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
func statefulSetFromManifest(fileName, ns string) *apps.StatefulSet {
|
|
||||||
var ss apps.StatefulSet
|
|
||||||
framework.Logf("Parsing statefulset from %v", fileName)
|
|
||||||
data, err := ioutil.ReadFile(fileName)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
json, err := utilyaml.ToJSON(data)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &ss)).NotTo(HaveOccurred())
|
|
||||||
ss.Namespace = ns
|
|
||||||
if ss.Spec.Selector == nil {
|
|
||||||
ss.Spec.Selector = &metav1.LabelSelector{
|
|
||||||
MatchLabels: ss.Spec.Template.Labels,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &ss
|
|
||||||
}
|
|
||||||
|
|
||||||
// statefulSetTester has all methods required to test a single statefulset.
|
|
||||||
type statefulSetTester struct {
|
|
||||||
c clientset.Interface
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) createStatefulSet(manifestPath, ns string) *apps.StatefulSet {
|
|
||||||
mkpath := func(file string) string {
|
|
||||||
return filepath.Join(framework.TestContext.RepoRoot, manifestPath, file)
|
|
||||||
}
|
|
||||||
ss := statefulSetFromManifest(mkpath("statefulset.yaml"), ns)
|
|
||||||
|
|
||||||
framework.Logf(fmt.Sprintf("creating " + ss.Name + " service"))
|
|
||||||
framework.RunKubectlOrDie("create", "-f", mkpath("service.yaml"), fmt.Sprintf("--namespace=%v", ns))
|
|
||||||
|
|
||||||
framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
|
|
||||||
framework.RunKubectlOrDie("create", "-f", mkpath("statefulset.yaml"), fmt.Sprintf("--namespace=%v", ns))
|
|
||||||
s.waitForRunningAndReady(*ss.Spec.Replicas, ss)
|
|
||||||
return ss
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) checkMount(ss *apps.StatefulSet, mountPath string) error {
|
|
||||||
for _, cmd := range []string{
|
|
||||||
// Print inode, size etc
|
|
||||||
fmt.Sprintf("ls -idlhZ %v", mountPath),
|
|
||||||
// Print subdirs
|
|
||||||
fmt.Sprintf("find %v", mountPath),
|
|
||||||
// Try writing
|
|
||||||
fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
|
|
||||||
} {
|
|
||||||
if err := s.execInStatefulPods(ss, cmd); err != nil {
|
|
||||||
return fmt.Errorf("failed to execute %v, error: %v", cmd, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) execInStatefulPods(ss *apps.StatefulSet, cmd string) error {
|
|
||||||
podList := s.getPodList(ss)
|
|
||||||
for _, statefulPod := range podList.Items {
|
|
||||||
stdout, err := framework.RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
|
|
||||||
framework.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) checkHostname(ss *apps.StatefulSet) error {
|
|
||||||
cmd := "printf $(hostname)"
|
|
||||||
podList := s.getPodList(ss)
|
|
||||||
for _, statefulPod := range podList.Items {
|
|
||||||
hostname, err := framework.RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if hostname != statefulPod.Name {
|
|
||||||
return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (s *statefulSetTester) saturate(ss *apps.StatefulSet) {
|
|
||||||
// TODO: Watch events and check that creation timestamss don't overlap
|
|
||||||
var i int32
|
|
||||||
for i = 0; i < *(ss.Spec.Replicas); i++ {
|
|
||||||
framework.Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
|
|
||||||
s.waitForRunningAndReady(i+1, ss)
|
|
||||||
framework.Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy")
|
|
||||||
s.setHealthy(ss)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) deleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) {
|
|
||||||
name := getPodNameAtIndex(index, ss)
|
|
||||||
noGrace := int64(0)
|
|
||||||
if err := s.c.Core().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
|
|
||||||
framework.Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type verifyPodFunc func(*v1.Pod)
|
|
||||||
|
|
||||||
func (s *statefulSetTester) verifyPodAtIndex(index int, ss *apps.StatefulSet, verify verifyPodFunc) {
|
|
||||||
name := getPodNameAtIndex(index, ss)
|
|
||||||
pod, err := s.c.Core().Pods(ss.Namespace).Get(name, metav1.GetOptions{})
|
|
||||||
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name))
|
|
||||||
verify(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getPodNameAtIndex(index int, ss *apps.StatefulSet) string {
|
|
||||||
// TODO: we won't use "-index" as the name strategy forever,
|
|
||||||
// pull the name out from an identity mapper.
|
|
||||||
return fmt.Sprintf("%v-%v", ss.Name, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) scale(ss *apps.StatefulSet, count int32) error {
|
|
||||||
name := ss.Name
|
|
||||||
ns := ss.Namespace
|
|
||||||
s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })
|
|
||||||
|
|
||||||
var statefulPodList *v1.PodList
|
|
||||||
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
|
|
||||||
statefulPodList = s.getPodList(ss)
|
|
||||||
if int32(len(statefulPodList.Items)) == count {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
})
|
|
||||||
if pollErr != nil {
|
|
||||||
unhealthy := []string{}
|
|
||||||
for _, statefulPod := range statefulPodList.Items {
|
|
||||||
delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, v1.IsPodReady(&statefulPod)
|
|
||||||
if delTs != nil || phase != v1.PodRunning || !readiness {
|
|
||||||
unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, statefulsetTimeout, unhealthy)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) updateReplicas(ss *apps.StatefulSet, count int32) {
|
|
||||||
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { ss.Spec.Replicas = &count })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) restart(ss *apps.StatefulSet) {
|
|
||||||
oldReplicas := *(ss.Spec.Replicas)
|
|
||||||
framework.ExpectNoError(s.scale(ss, 0))
|
|
||||||
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) {
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
framework.Failf("failed to get statefulset %q: %v", name, err)
|
|
||||||
}
|
|
||||||
update(ss)
|
|
||||||
ss, err = s.c.Apps().StatefulSets(ns).Update(ss)
|
|
||||||
if err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
|
|
||||||
framework.Failf("failed to update statefulset %q: %v", name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
framework.Failf("too many retries draining statefulset %q", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) getPodList(ss *apps.StatefulSet) *v1.PodList {
|
|
||||||
selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
podList, err := s.c.Core().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
return podList
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) confirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration) {
|
|
||||||
start := time.Now()
|
|
||||||
deadline := start.Add(timeout)
|
|
||||||
for t := time.Now(); t.Before(deadline); t = time.Now() {
|
|
||||||
podList := s.getPodList(ss)
|
|
||||||
statefulPodCount := len(podList.Items)
|
|
||||||
if statefulPodCount != count {
|
|
||||||
framework.Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ss.Name, count, len(podList.Items), podList)
|
|
||||||
}
|
|
||||||
framework.Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t))
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) {
|
|
||||||
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout,
|
|
||||||
func() (bool, error) {
|
|
||||||
podList := s.getPodList(ss)
|
|
||||||
if int32(len(podList.Items)) < numStatefulPods {
|
|
||||||
framework.Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
if int32(len(podList.Items)) > numStatefulPods {
|
|
||||||
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items))
|
|
||||||
}
|
|
||||||
for _, p := range podList.Items {
|
|
||||||
isReady := v1.IsPodReady(&p)
|
|
||||||
desiredReadiness := shouldBeReady == isReady
|
|
||||||
framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
|
|
||||||
if p.Status.Phase != v1.PodRunning || !desiredReadiness {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
if pollErr != nil {
|
|
||||||
framework.Failf("Failed waiting for pods to enter running: %v", pollErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) waitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
|
|
||||||
s.waitForRunning(numStatefulPods, ss, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) waitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
|
|
||||||
s.waitForRunning(numStatefulPods, ss, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) breakProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
|
|
||||||
path := probe.HTTPGet.Path
|
|
||||||
if path == "" {
|
|
||||||
return fmt.Errorf("Path expected to be not empty: %v", path)
|
|
||||||
}
|
|
||||||
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
|
|
||||||
return s.execInStatefulPods(ss, cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) restoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
|
|
||||||
path := probe.HTTPGet.Path
|
|
||||||
if path == "" {
|
|
||||||
return fmt.Errorf("Path expected to be not empty: %v", path)
|
|
||||||
}
|
|
||||||
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
|
|
||||||
return s.execInStatefulPods(ss, cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) setHealthy(ss *apps.StatefulSet) {
|
|
||||||
podList := s.getPodList(ss)
|
|
||||||
markedHealthyPod := ""
|
|
||||||
for _, pod := range podList.Items {
|
|
||||||
if pod.Status.Phase != v1.PodRunning {
|
|
||||||
framework.Failf("Found pod in %v cannot set health", pod.Status.Phase)
|
|
||||||
}
|
|
||||||
if isInitialized(pod) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if markedHealthyPod != "" {
|
|
||||||
framework.Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod)
|
|
||||||
}
|
|
||||||
p, err := framework.UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) {
|
|
||||||
update.Annotations[apps.StatefulSetInitAnnotation] = "true"
|
|
||||||
})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
framework.Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name)
|
|
||||||
markedHealthyPod = pod.Name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statefulSetTester) waitForStatus(ss *apps.StatefulSet, expectedReplicas int32) {
|
|
||||||
framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
|
|
||||||
|
|
||||||
ns, name := ss.Namespace, ss.Name
|
|
||||||
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout,
|
|
||||||
func() (bool, error) {
|
|
||||||
ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if ssGet.Status.Replicas != expectedReplicas {
|
|
||||||
framework.Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
if pollErr != nil {
|
|
||||||
framework.Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *statefulSetTester) checkServiceName(ps *apps.StatefulSet, expectedServiceName string) error {
|
|
||||||
framework.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
|
|
||||||
|
|
||||||
if expectedServiceName != ps.Spec.ServiceName {
|
|
||||||
return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s", expectedServiceName, ps.Spec.ServiceName)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func deleteAllStatefulSets(c clientset.Interface, ns string) {
|
|
||||||
sst := &statefulSetTester{c: c}
|
|
||||||
ssList, err := c.Apps().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
|
|
||||||
// Scale down each statefulset, then delete it completely.
|
|
||||||
// Deleting a pvc without doing this will leak volumes, #25101.
|
|
||||||
errList := []string{}
|
|
||||||
for _, ss := range ssList.Items {
|
|
||||||
framework.Logf("Scaling statefulset %v to 0", ss.Name)
|
|
||||||
if err := sst.scale(&ss, 0); err != nil {
|
|
||||||
errList = append(errList, fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
sst.waitForStatus(&ss, 0)
|
|
||||||
framework.Logf("Deleting statefulset %v", ss.Name)
|
|
||||||
if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil {
|
|
||||||
errList = append(errList, fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
|
|
||||||
pvNames := sets.NewString()
|
|
||||||
// TODO: Don't assume all pvcs in the ns belong to a statefulset
|
|
||||||
pvcPollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
|
|
||||||
pvcList, err := c.Core().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
|
|
||||||
if err != nil {
|
|
||||||
framework.Logf("WARNING: Failed to list pvcs, retrying %v", err)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, pvc := range pvcList.Items {
|
|
||||||
pvNames.Insert(pvc.Spec.VolumeName)
|
|
||||||
// TODO: Double check that there are no pods referencing the pvc
|
|
||||||
framework.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
|
|
||||||
if err := c.Core().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
if pvcPollErr != nil {
|
|
||||||
errList = append(errList, "Timeout waiting for pvc deletion.")
|
|
||||||
}
|
|
||||||
|
|
||||||
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
|
|
||||||
pvList, err := c.Core().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
|
|
||||||
if err != nil {
|
|
||||||
framework.Logf("WARNING: Failed to list pvs, retrying %v", err)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
waitingFor := []string{}
|
|
||||||
for _, pv := range pvList.Items {
|
|
||||||
if pvNames.Has(pv.Name) {
|
|
||||||
waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(waitingFor) == 0 {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
framework.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
|
|
||||||
return false, nil
|
|
||||||
})
|
|
||||||
if pollErr != nil {
|
|
||||||
errList = append(errList, "Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs.")
|
|
||||||
}
|
|
||||||
if len(errList) != 0 {
|
|
||||||
framework.ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, key, expectedVal string) error {
|
func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, key, expectedVal string) error {
|
||||||
err := wait.PollImmediate(time.Second, readTimeout, func() (bool, error) {
|
err := wait.PollImmediate(time.Second, readTimeout, func() (bool, error) {
|
||||||
val := statefulPod.read(statefulPodNumber, key)
|
val := statefulPod.read(statefulPodNumber, key)
|
||||||
@ -1058,100 +680,3 @@ func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, k
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func isInitialized(pod v1.Pod) bool {
|
|
||||||
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
|
|
||||||
if !ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
inited, err := strconv.ParseBool(initialized)
|
|
||||||
if err != nil {
|
|
||||||
framework.Failf("Couldn't parse statefulset init annotations %v", initialized)
|
|
||||||
}
|
|
||||||
return inited
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPVC(name string) v1.PersistentVolumeClaim {
|
|
||||||
return v1.PersistentVolumeClaim{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: name,
|
|
||||||
Annotations: map[string]string{
|
|
||||||
"volume.alpha.kubernetes.io/storage-class": "anything",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: v1.PersistentVolumeClaimSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{
|
|
||||||
v1.ReadWriteOnce,
|
|
||||||
},
|
|
||||||
Resources: v1.ResourceRequirements{
|
|
||||||
Requests: v1.ResourceList{
|
|
||||||
v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet {
|
|
||||||
mounts := append(statefulPodMounts, podMounts...)
|
|
||||||
claims := []v1.PersistentVolumeClaim{}
|
|
||||||
for _, m := range statefulPodMounts {
|
|
||||||
claims = append(claims, newPVC(m.Name))
|
|
||||||
}
|
|
||||||
|
|
||||||
vols := []v1.Volume{}
|
|
||||||
for _, m := range podMounts {
|
|
||||||
vols = append(vols, v1.Volume{
|
|
||||||
Name: m.Name,
|
|
||||||
VolumeSource: v1.VolumeSource{
|
|
||||||
HostPath: &v1.HostPathVolumeSource{
|
|
||||||
Path: fmt.Sprintf("/tmp/%v", m.Name),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
privileged := true
|
|
||||||
|
|
||||||
return &apps.StatefulSet{
|
|
||||||
TypeMeta: metav1.TypeMeta{
|
|
||||||
Kind: "StatefulSet",
|
|
||||||
APIVersion: "apps/v1beta1",
|
|
||||||
},
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: name,
|
|
||||||
Namespace: ns,
|
|
||||||
},
|
|
||||||
Spec: apps.StatefulSetSpec{
|
|
||||||
Selector: &metav1.LabelSelector{
|
|
||||||
MatchLabels: labels,
|
|
||||||
},
|
|
||||||
Replicas: func(i int32) *int32 { return &i }(replicas),
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: labels,
|
|
||||||
Annotations: map[string]string{},
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: "nginx",
|
|
||||||
Image: nginxImage,
|
|
||||||
VolumeMounts: mounts,
|
|
||||||
SecurityContext: &v1.SecurityContext{
|
|
||||||
Privileged: &privileged,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Volumes: vols,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
VolumeClaimTemplates: claims,
|
|
||||||
ServiceName: governingSvcName,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func setInitializedAnnotation(ss *apps.StatefulSet, value string) {
|
|
||||||
ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value
|
|
||||||
}
|
|
||||||
|
@ -13,15 +13,18 @@ go_library(
|
|||||||
"deployments.go",
|
"deployments.go",
|
||||||
"secrets.go",
|
"secrets.go",
|
||||||
"services.go",
|
"services.go",
|
||||||
|
"statefulset.go",
|
||||||
"upgrade.go",
|
"upgrade.go",
|
||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
|
"//pkg/apis/apps/v1beta1:go_default_library",
|
||||||
"//pkg/apis/extensions/v1beta1:go_default_library",
|
"//pkg/apis/extensions/v1beta1:go_default_library",
|
||||||
"//pkg/controller/deployment/util:go_default_library",
|
"//pkg/controller/deployment/util:go_default_library",
|
||||||
"//test/e2e/framework:go_default_library",
|
"//test/e2e/framework:go_default_library",
|
||||||
"//vendor:github.com/onsi/ginkgo",
|
"//vendor:github.com/onsi/ginkgo",
|
||||||
|
"//vendor:github.com/onsi/gomega",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
|
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
|
99
test/e2e/upgrades/statefulset.go
Normal file
99
test/e2e/upgrades/statefulset.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package upgrades
|
||||||
|
|
||||||
|
import (
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StatefulSetUpgradeTest implements an upgrade test harness for StatefulSet upgrade testing.
|
||||||
|
type StatefulSetUpgradeTest struct {
|
||||||
|
tester *framework.StatefulSetTester
|
||||||
|
service *v1.Service
|
||||||
|
set *apps.StatefulSet
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup creates a StatefulSet and a HeadlessService. It verifies the basic SatefulSet properties
|
||||||
|
func (t *StatefulSetUpgradeTest) Setup(f *framework.Framework) {
|
||||||
|
ssName := "ss"
|
||||||
|
labels := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
"baz": "blah",
|
||||||
|
}
|
||||||
|
headlessSvcName := "test"
|
||||||
|
statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
|
||||||
|
podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
|
||||||
|
ns := f.Namespace.Name
|
||||||
|
t.set = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
|
||||||
|
t.service = framework.CreateStatefulSetService(ssName, labels)
|
||||||
|
*(t.set.Spec.Replicas) = 3
|
||||||
|
framework.SetStatefulSetInitializedAnnotation(t.set, "false")
|
||||||
|
|
||||||
|
By("Creating service " + headlessSvcName + " in namespace " + ns)
|
||||||
|
_, err := f.ClientSet.Core().Services(ns).Create(t.service)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
t.tester = framework.NewStatefulSetTester(f.ClientSet)
|
||||||
|
|
||||||
|
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||||
|
*(t.set.Spec.Replicas) = 3
|
||||||
|
_, err = f.ClientSet.Apps().StatefulSets(ns).Create(t.set)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Saturating stateful set " + t.set.Name)
|
||||||
|
t.tester.Saturate(t.set)
|
||||||
|
t.verify()
|
||||||
|
t.restart()
|
||||||
|
t.verify()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Waits for the upgrade to complete and verifies the StatefulSet basic functionality
|
||||||
|
func (t *StatefulSetUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
|
||||||
|
<-done
|
||||||
|
t.verify()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deletes all StatefulSets
|
||||||
|
func (t *StatefulSetUpgradeTest) Teardown(f *framework.Framework) {
|
||||||
|
framework.DeleteAllStatefulSets(f.ClientSet, t.set.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *StatefulSetUpgradeTest) verify() {
|
||||||
|
By("Verifying statefulset mounted data directory is usable")
|
||||||
|
framework.ExpectNoError(t.tester.CheckMount(t.set, "/data"))
|
||||||
|
|
||||||
|
By("Verifying statefulset provides a stable hostname for each pod")
|
||||||
|
framework.ExpectNoError(t.tester.CheckHostname(t.set))
|
||||||
|
|
||||||
|
By("Verifying statefulset set proper service name")
|
||||||
|
framework.ExpectNoError(t.tester.CheckServiceName(t.set, t.set.Spec.ServiceName))
|
||||||
|
|
||||||
|
cmd := "echo $(hostname) > /data/hostname; sync;"
|
||||||
|
By("Running " + cmd + " in all stateful pods")
|
||||||
|
framework.ExpectNoError(t.tester.ExecInStatefulPods(t.set, cmd))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *StatefulSetUpgradeTest) restart() {
|
||||||
|
By("Restarting statefulset " + t.set.Name)
|
||||||
|
t.tester.Restart(t.set)
|
||||||
|
t.tester.Saturate(t.set)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user