diff --git a/test/integration/BUILD b/test/integration/BUILD index 67c3f1bfcf9..d6e3065179b 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -62,6 +62,7 @@ filegroup( "//test/integration/scheduler_perf:all-srcs", "//test/integration/secrets:all-srcs", "//test/integration/serviceaccount:all-srcs", + "//test/integration/statefulset:all-srcs", "//test/integration/storageclasses:all-srcs", "//test/integration/tls:all-srcs", "//test/integration/ttlcontroller:all-srcs", diff --git a/test/integration/statefulset/BUILD b/test/integration/statefulset/BUILD new file mode 100644 index 00000000000..517987a4d47 --- /dev/null +++ b/test/integration/statefulset/BUILD @@ -0,0 +1,57 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["util.go"], + importpath = "k8s.io/kubernetes/test/integration/statefulset", + visibility = ["//visibility:public"], + deps = [ + #"//pkg/api:go_default_library", + "//pkg/controller/statefulset:go_default_library", + "//test/integration/framework:go_default_library", + "//vendor/k8s.io/api/apps/v1beta1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/apps/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/util/retry:go_default_library", + ], +) + +go_test( + name = "go_default_test", + size = "large", + srcs = [ + "main_test.go", + "statefulset_test.go", + ], + embed = [":go_default_library"], + tags = ["integration"], + deps = [ + "//test/integration/framework:go_default_library", + "//vendor/k8s.io/api/apps/v1beta1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/integration/statefulset/OWNERS b/test/integration/statefulset/OWNERS new file mode 100644 index 00000000000..ef439d8b051 --- /dev/null +++ b/test/integration/statefulset/OWNERS @@ -0,0 +1,6 @@ +reviewers: + - barney-s + - crimsonfaith91 +approvers: + - barney-s + - crimsonfaith91 diff --git a/test/integration/statefulset/main_test.go b/test/integration/statefulset/main_test.go new file mode 100644 index 00000000000..4527b42bab0 --- /dev/null +++ b/test/integration/statefulset/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statefulset + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/statefulset/statefulset_test.go b/test/integration/statefulset/statefulset_test.go new file mode 100644 index 00000000000..d4946aa4f40 --- /dev/null +++ b/test/integration/statefulset/statefulset_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statefulset + +import ( + "fmt" + "testing" + + "k8s.io/api/apps/v1beta1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestSpecReplicasChange(t *testing.T) { + s, closeFn, rm, informers, c := scSetup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("test-spec-replicas-change", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + stopCh := runControllerAndInformers(rm, informers) + defer close(stopCh) + + createHeadlessService(t, c, newHeadlessService(ns.Name)) + sts := newSTS("sts", ns.Name, 2) + stss, _ := createSTSsPods(t, c, []*v1beta1.StatefulSet{sts}, []*v1.Pod{}) + sts = stss[0] + waitSTSStable(t, c, sts) + + // Update .Spec.Replicas and verify .Status.Replicas is changed accordingly + scaleSTS(t, c, sts, 3) + scaleSTS(t, c, sts, 0) + scaleSTS(t, c, sts, 2) + + // Add a template annotation change to test STS's status does update + // without .Spec.Replicas change + stsClient := c.AppsV1beta1().StatefulSets(ns.Name) + var oldGeneration int64 + newSTS := updateSTS(t, stsClient, sts.Name, func(sts *v1beta1.StatefulSet) { + oldGeneration = sts.Generation + sts.Spec.Template.Annotations = map[string]string{"test": "annotation"} + }) + savedGeneration := newSTS.Generation + if savedGeneration == oldGeneration { + t.Fatalf("failed to verify .Generation has incremented for sts %s", sts.Name) + } + + if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return *newSTS.Status.ObservedGeneration >= savedGeneration, nil + }); err != nil { + t.Fatalf("failed to verify .Status.ObservedGeneration has incremented for sts %s: %v", sts.Name, err) + } +} + +func TestDeletingAndFailedPods(t *testing.T) { + s, closeFn, rm, informers, c := scSetup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + stopCh := runControllerAndInformers(rm, informers) + defer close(stopCh) + + labelMap := labelMap() + sts := newSTS("sts", ns.Name, 2) + stss, _ := createSTSsPods(t, c, []*v1beta1.StatefulSet{sts}, []*v1.Pod{}) + sts = stss[0] + waitSTSStable(t, c, sts) + + // Verify STS creates 2 pods + podClient := c.Core().Pods(ns.Name) + pods := getPods(t, podClient, labelMap) + if len(pods.Items) != 2 { + t.Fatalf("len(pods) = %d, want 2", len(pods.Items)) + } + + // Set first pod as deleting pod + // Set finalizers for the pod to simulate pending deletion status + deletingPod := &pods.Items[0] + updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) { + pod.Finalizers = []string{"fake.example.com/blockDeletion"} + }) + if err := c.Core().Pods(ns.Name).Delete(deletingPod.Name, &metav1.DeleteOptions{}); err != nil { + t.Fatalf("error deleting pod %s: %v", deletingPod.Name, err) + } + + // Set second pod as failed pod + failedPod := &pods.Items[1] + updatePodStatus(t, podClient, failedPod.Name, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodFailed + }) + + if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + // Verify only 2 pods exist: deleting pod and new pod replacing failed pod + pods = getPods(t, podClient, labelMap) + if len(pods.Items) != 2 { + return false, nil + } + // Verify deleting pod still exists + // Immediately return false with an error if it does not exist + if pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID { + return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name) + } + // Verify failed pod does not exist anymore + if pods.Items[0].UID == failedPod.UID || pods.Items[1].UID == failedPod.UID { + return false, nil + } + // Verify both pods have non-failed status + return pods.Items[0].Status.Phase != v1.PodFailed && pods.Items[1].Status.Phase != v1.PodFailed, nil + }); err != nil { + t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err) + } + + // Remove finalizers of deleting pod to simulate successful deletion + updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) { + pod.Finalizers = []string{} + }) + + if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + // Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod + pods = getPods(t, podClient, labelMap) + if len(pods.Items) != 2 { + return false, nil + } + // Verify deleting pod does not exist anymore + return pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID, nil + }); err != nil { + t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err) + } +} diff --git a/test/integration/statefulset/util.go b/test/integration/statefulset/util.go new file mode 100644 index 00000000000..78ee9940140 --- /dev/null +++ b/test/integration/statefulset/util.go @@ -0,0 +1,319 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statefulset + +import ( + "fmt" + "net/http/httptest" + "testing" + "time" + + "k8s.io/api/apps/v1beta1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + typedv1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" + //svc "k8s.io/kubernetes/pkg/api/v1/service" + "k8s.io/kubernetes/pkg/controller/statefulset" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + pollInterval = 100 * time.Millisecond + pollTimeout = 60 * time.Second + + fakeImageName = "fake-name" + fakeImage = "fakeimage" +) + +type statefulsetTester struct { + t *testing.T + c clientset.Interface + service *v1.Service + statefulset *v1beta1.StatefulSet +} + +func labelMap() map[string]string { + return map[string]string{"foo": "bar"} +} + +// newService returns a service with a fake name for StatefulSet to be created soon +func newHeadlessService(namespace string) *v1.Service { + return &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "fake-service-name", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "None", + Ports: []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: "TCP"}, + }, + Selector: labelMap(), + }, + } +} + +// newSTS returns a StatefulSet with with a fake container image +func newSTS(name, namespace string, replicas int) *v1beta1.StatefulSet { + replicasCopy := int32(replicas) + return &v1beta1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1beta1.StatefulSetSpec{ + PodManagementPolicy: v1beta1.ParallelPodManagement, + Replicas: &replicasCopy, + Selector: &metav1.LabelSelector{ + MatchLabels: labelMap(), + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labelMap(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + VolumeMounts: []v1.VolumeMount{ + {Name: "datadir", MountPath: "/data/"}, + {Name: "home", MountPath: "/home"}, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "datadir", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: fmt.Sprintf("/tmp/%v", "datadir"), + }, + }, + }, + { + Name: "home", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: fmt.Sprintf("/tmp/%v", "home"), + }, + }, + }, + }, + }, + }, + ServiceName: "fake-service-name", + UpdateStrategy: v1beta1.StatefulSetUpdateStrategy{ + Type: v1beta1.RollingUpdateStatefulSetStrategyType, + }, + VolumeClaimTemplates: []v1.PersistentVolumeClaim{ + // for volume mount "datadir" + newStatefulSetPVC("fake-pvc-name"), + }, + }, + } +} + +func newStatefulSetPVC(name string) v1.PersistentVolumeClaim { + return v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + "volume.alpha.kubernetes.io/storage-class": "anything", + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + }, + }, + } +} + +// scSetup sets up necessities for Statefulset integration test, including master, apiserver, informers, and clientset +func scSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) { + masterConfig := framework.NewIntegrationTestMasterConfig() + _, s, closeFn := framework.RunAMaster(masterConfig) + + config := restclient.Config{Host: s.URL} + clientSet, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("error in create clientset: %v", err) + } + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod) + + sc := statefulset.NewStatefulSetController( + informers.Core().V1().Pods(), + informers.Apps().V1().StatefulSets(), + informers.Core().V1().PersistentVolumeClaims(), + informers.Apps().V1().ControllerRevisions(), + clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")), + ) + + return s, closeFn, sc, informers, clientSet +} + +// Run STS controller and informers +func runControllerAndInformers(sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) chan struct{} { + stopCh := make(chan struct{}) + informers.Start(stopCh) + go sc.Run(5, stopCh) + return stopCh +} + +func createHeadlessService(t *testing.T, clientSet clientset.Interface, headlessService *v1.Service) { + _, err := clientSet.Core().Services(headlessService.Namespace).Create(headlessService) + if err != nil { + t.Fatalf("failed creating headless service: %v", err) + } +} + +func createSTSsPods(t *testing.T, clientSet clientset.Interface, stss []*v1beta1.StatefulSet, pods []*v1.Pod) ([]*v1beta1.StatefulSet, []*v1.Pod) { + var createdSTSs []*v1beta1.StatefulSet + var createdPods []*v1.Pod + for _, sts := range stss { + createdSTS, err := clientSet.AppsV1beta1().StatefulSets(sts.Namespace).Create(sts) + if err != nil { + t.Fatalf("failed to create sts %s: %v", sts.Name, err) + } + createdSTSs = append(createdSTSs, createdSTS) + } + for _, pod := range pods { + createdPod, err := clientSet.Core().Pods(pod.Namespace).Create(pod) + if err != nil { + t.Fatalf("failed to create pod %s: %v", pod.Name, err) + } + createdPods = append(createdPods, createdPod) + } + + return createdSTSs, createdPods +} + +// Verify .Status.Replicas is equal to .Spec.Replicas +func waitSTSStable(t *testing.T, clientSet clientset.Interface, sts *v1beta1.StatefulSet) { + stsClient := clientSet.AppsV1beta1().StatefulSets(sts.Namespace) + desiredGeneration := sts.Generation + if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return newSTS.Status.Replicas == *newSTS.Spec.Replicas && *newSTS.Status.ObservedGeneration >= desiredGeneration, nil + }); err != nil { + t.Fatalf("failed to verify .Status.Replicas is equal to .Spec.Replicas for sts %s: %v", sts.Name, err) + } +} + +func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod { + var pod *v1.Pod + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newPod, err := podClient.Get(podName, metav1.GetOptions{}) + if err != nil { + return err + } + updateFunc(newPod) + pod, err = podClient.Update(newPod) + return err + }); err != nil { + t.Fatalf("failed to update pod %s: %v", podName, err) + } + return pod +} + +func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, podName string, updateStatusFunc func(*v1.Pod)) *v1.Pod { + var pod *v1.Pod + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newPod, err := podClient.Get(podName, metav1.GetOptions{}) + if err != nil { + return err + } + updateStatusFunc(newPod) + pod, err = podClient.UpdateStatus(newPod) + return err + }); err != nil { + t.Fatalf("failed to update status of pod %s: %v", podName, err) + } + return pod +} + +func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList { + podSelector := labels.Set(labelMap).AsSelector() + options := metav1.ListOptions{LabelSelector: podSelector.String()} + pods, err := podClient.List(options) + if err != nil { + t.Fatalf("failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err) + } + if pods == nil { + t.Fatalf("obtained a nil list of pods") + } + return pods +} + +func updateSTS(t *testing.T, stsClient typedv1beta1.StatefulSetInterface, stsName string, updateFunc func(*v1beta1.StatefulSet)) *v1beta1.StatefulSet { + var sts *v1beta1.StatefulSet + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newSTS, err := stsClient.Get(stsName, metav1.GetOptions{}) + if err != nil { + return err + } + updateFunc(newSTS) + sts, err = stsClient.Update(newSTS) + return err + }); err != nil { + t.Fatalf("failed to update sts %s: %v", stsName, err) + } + return sts +} + +// Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly +func scaleSTS(t *testing.T, c clientset.Interface, sts *v1beta1.StatefulSet, replicas int32) { + stsClient := c.AppsV1beta1().StatefulSets(sts.Namespace) + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{}) + if err != nil { + return err + } + *newSTS.Spec.Replicas = replicas + sts, err = stsClient.Update(newSTS) + return err + }); err != nil { + t.Fatalf("failed to update .Spec.Replicas to %d for sts %s: %v", replicas, sts.Name, err) + } + waitSTSStable(t, c, sts) +}