diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index ae410417592..2e57d87e264 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -72,7 +72,10 @@ func setupProviderConfig() error { managedZones = []string{zone} } - gceAlphaFeatureGate, err := gcecloud.NewAlphaFeatureGate([]string{gcecloud.AlphaFeatureNetworkEndpointGroup}) + gceAlphaFeatureGate, err := gcecloud.NewAlphaFeatureGate([]string{ + gcecloud.AlphaFeatureNetworkEndpointGroup, + gcecloud.AlphaFeatureGCEDisk, + }) if err != nil { glog.Errorf("Encountered error for creating alpha feature gate: %v", err) } diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 3ad6f17222c..e98754cf4b7 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -62,6 +62,7 @@ go_library( "//pkg/controller/nodelifecycle:go_default_library", "//pkg/features:go_default_library", "//pkg/kubectl:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/dockershim/metrics:go_default_library", diff --git a/test/e2e/framework/google_compute.go b/test/e2e/framework/google_compute.go index 4553a0db3a3..1738a55bdd3 100644 --- a/test/e2e/framework/google_compute.go +++ b/test/e2e/framework/google_compute.go @@ -124,3 +124,33 @@ func LogClusterImageSources() { Logf("cluster images sources, could not write to %q: %v", filePath, err) } } + +func CreateManagedInstanceGroup(size int64, zone, template string) error { + // TODO(verult): make this hit the compute API directly instead of + // shelling out to gcloud. + _, _, err := retryCmd("gcloud", "compute", "instance-groups", "managed", + "create", + fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID), + fmt.Sprintf("--zone=%s", zone), + TestContext.CloudConfig.NodeInstanceGroup, + fmt.Sprintf("--size=%d", size), + fmt.Sprintf("--template=%s", template)) + if err != nil { + return fmt.Errorf("gcloud compute instance-groups managed create call failed with err: %v", err) + } + return nil +} + +func DeleteManagedInstanceGroup(zone string) error { + // TODO(verult): make this hit the compute API directly instead of + // shelling out to gcloud. + _, _, err := retryCmd("gcloud", "compute", "instance-groups", "managed", + "delete", + fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID), + fmt.Sprintf("--zone=%s", zone), + TestContext.CloudConfig.NodeInstanceGroup) + if err != nil { + return fmt.Errorf("gcloud compute instance-groups managed delete call failed with err: %v", err) + } + return nil +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 7f6cb443c97..6467b3b23e1 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -90,6 +90,7 @@ import ( nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubectl" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -336,6 +337,16 @@ func SkipUnlessProviderIs(supportedProviders ...string) { } } +func SkipUnlessMultizone(c clientset.Interface) { + zones, err := GetClusterZones(c) + if err != nil { + Skipf("Error listing cluster zones") + } + if zones.Len() <= 1 { + Skipf("Requires more than one zone") + } +} + func SkipUnlessClusterMonitoringModeIs(supportedMonitoring ...string) { if !ClusterMonitoringModeIs(supportedMonitoring...) { Skipf("Only next monitoring modes are supported %v (not %s)", supportedMonitoring, TestContext.ClusterMonitoringMode) @@ -897,6 +908,26 @@ func WaitForPersistentVolumePhase(phase v1.PersistentVolumePhase, c clientset.In return fmt.Errorf("PersistentVolume %s not in phase %s within %v", pvName, phase, timeout) } +// WaitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first. +func WaitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error { + Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) { + sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{}) + if err != nil { + Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err) + continue + } else { + if sts.Status.ReadyReplicas == *sts.Spec.Replicas { + Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start)) + return nil + } else { + Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas) + } + } + } + return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout) +} + // WaitForPersistentVolumeDeleted waits for a PersistentVolume to get deleted or until timeout occurs, whichever comes first. func WaitForPersistentVolumeDeleted(c clientset.Interface, pvName string, Poll, timeout time.Duration) error { Logf("Waiting up to %v for PersistentVolume %s to get deleted", timeout, pvName) @@ -4067,7 +4098,7 @@ func WaitForReadyNodes(c clientset.Interface, size int, timeout time.Duration) e Logf("Cluster has reached the desired number of ready nodes %d", size) return nil } - Logf("Waiting for ready nodes %d, current ready %d, not ready nodes %d", size, numNodes, numNodes-numReady) + Logf("Waiting for ready nodes %d, current ready %d, not ready nodes %d", size, numReady, numNodes-numReady) } return fmt.Errorf("timeout waiting %v for number of ready nodes to be %d", timeout, size) } @@ -5160,3 +5191,19 @@ func WaitForPersistentVolumeClaimDeleted(c clientset.Interface, ns string, pvcNa } return fmt.Errorf("PersistentVolumeClaim %s is not removed from the system within %v", pvcName, timeout) } + +func GetClusterZones(c clientset.Interface) (sets.String, error) { + nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("Error getting nodes while attempting to list cluster zones: %v", err) + } + + // collect values of zone label from all nodes + zones := sets.NewString() + for _, node := range nodes.Items { + if zone, found := node.Labels[kubeletapis.LabelZoneFailureDomain]; found { + zones.Insert(zone) + } + } + return zones, nil +} diff --git a/test/e2e/scheduling/ubernetes_lite.go b/test/e2e/scheduling/ubernetes_lite.go index bae5351a51c..3fdb711325f 100644 --- a/test/e2e/scheduling/ubernetes_lite.go +++ b/test/e2e/scheduling/ubernetes_lite.go @@ -125,6 +125,7 @@ func getZoneNameForNode(node v1.Node) (string, error) { node.Name, kubeletapis.LabelZoneFailureDomain) } +// TODO (verult) Merge with framework.GetClusterZones() // Find the names of all zones in which we have nodes in this cluster. func getZoneNames(c clientset.Interface) ([]string, error) { zoneNames := sets.NewString() diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index 7f43c80a9f6..431322a375c 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -20,6 +20,7 @@ go_library( "persistent_volumes-local.go", "pv_protection.go", "pvc_protection.go", + "regional_pd.go", "volume_expand.go", "volume_io.go", "volume_metrics.go", @@ -29,6 +30,7 @@ go_library( importpath = "k8s.io/kubernetes/test/e2e/storage", deps = [ "//pkg/api/testapi:go_default_library", + "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/storage/v1/util:go_default_library", "//pkg/client/conditions:go_default_library", diff --git a/test/e2e/storage/regional_pd.go b/test/e2e/storage/regional_pd.go new file mode 100644 index 00000000000..361474e0831 --- /dev/null +++ b/test/e2e/storage/regional_pd.go @@ -0,0 +1,450 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "fmt" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/kubelet/apis" + "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/storage/utils" + "strings" + "time" +) + +const ( + pvDeletionTimeout = 3 * time.Minute + statefulSetReadyTimeout = 3 * time.Minute +) + +var _ = utils.SIGDescribe("Regional PD [Feature:RegionalPD]", func() { + f := framework.NewDefaultFramework("regional-pd") + + // filled in BeforeEach + var c clientset.Interface + var ns string + + BeforeEach(func() { + c = f.ClientSet + ns = f.Namespace.Name + + framework.SkipUnlessProviderIs("gce", "gke") + framework.SkipUnlessMultizone(c) + }) + + Describe("RegionalPD", func() { + It("should provision storage [Slow]", func() { + testVolumeProvisioning(c, ns) + }) + + It("should failover to a different zone when all nodes in one zone become unreachable [Slow] [Disruptive]", func() { + testZonalFailover(c, ns) + }) + + }) +}) + +func testVolumeProvisioning(c clientset.Interface, ns string) { + cloudZones := getTwoRandomZones(c) + + // This test checks that dynamic provisioning can provision a volume + // that can be used to persist data among pods. + tests := []storageClassTest{ + { + name: "HDD Regional PD on GCE/GKE", + cloudProviders: []string{"gce", "gke"}, + provisioner: "kubernetes.io/gce-pd", + parameters: map[string]string{ + "type": "pd-standard", + "zones": strings.Join(cloudZones, ","), + "replication-type": "regional-pd", + }, + claimSize: "1.5G", + expectedSize: "2G", + pvCheck: func(volume *v1.PersistentVolume) error { + err := checkGCEPD(volume, "pd-standard") + if err != nil { + return err + } + return verifyZonesInPV(volume, sets.NewString(cloudZones...), true /* match */) + }, + }, + { + name: "HDD Regional PD with auto zone selection on GCE/GKE", + cloudProviders: []string{"gce", "gke"}, + provisioner: "kubernetes.io/gce-pd", + parameters: map[string]string{ + "type": "pd-standard", + "replication-type": "regional-pd", + }, + claimSize: "1.5G", + expectedSize: "2G", + pvCheck: func(volume *v1.PersistentVolume) error { + err := checkGCEPD(volume, "pd-standard") + if err != nil { + return err + } + zones, err := framework.GetClusterZones(c) + if err != nil { + return err + } + return verifyZonesInPV(volume, zones, false /* match */) + }, + }, + } + + for _, test := range tests { + class := newStorageClass(test, ns, "" /* suffix */) + claim := newClaim(test, ns, "" /* suffix */) + claim.Spec.StorageClassName = &class.Name + testDynamicProvisioning(test, c, claim, class) + } +} + +func testZonalFailover(c clientset.Interface, ns string) { + nodes := framework.GetReadySchedulableNodesOrDie(c) + nodeCount := len(nodes.Items) + + cloudZones := getTwoRandomZones(c) + class := newRegionalStorageClass(ns, cloudZones) + claimTemplate := newClaimTemplate(ns) + claimTemplate.Spec.StorageClassName = &class.Name + statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns) + + By("creating a StorageClass " + class.Name) + _, err := c.StorageV1().StorageClasses().Create(class) + Expect(err).NotTo(HaveOccurred()) + defer func() { + framework.Logf("deleting storage class %s", class.Name) + framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(class.Name, nil), + "Error deleting StorageClass %s", class.Name) + }() + + By("creating a StatefulSet") + _, err = c.CoreV1().Services(ns).Create(service) + Expect(err).NotTo(HaveOccurred()) + _, err = c.AppsV1().StatefulSets(ns).Create(statefulSet) + Expect(err).NotTo(HaveOccurred()) + + defer func() { + framework.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name) + // typically this claim has already been deleted + framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(statefulSet.Name, nil /* options */), + "Error deleting StatefulSet %s", statefulSet.Name) + + framework.Logf("deleting claims in namespace %s", ns) + pvc := getPVC(c, ns, regionalPDLabels) + framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, nil), + "Error deleting claim %s.", pvc.Name) + if pvc.Spec.VolumeName != "" { + err = framework.WaitForPersistentVolumeDeleted(c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout) + if err != nil { + framework.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName) + } + } + }() + + err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout) + if err != nil { + pod := getPod(c, ns, regionalPDLabels) + Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(BeTrue(), + "The statefulset pod has the following conditions: %s", pod.Status.Conditions) + Expect(err).NotTo(HaveOccurred()) + } + + pvc := getPVC(c, ns, regionalPDLabels) + + By("getting zone information from pod") + pod := getPod(c, ns, regionalPDLabels) + nodeName := pod.Spec.NodeName + node, err := c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + podZone := node.Labels[apis.LabelZoneFailureDomain] + + // TODO (verult) Consider using node taints to simulate zonal failure instead. + By("deleting instance group belonging to pod's zone") + + // Asynchronously detect a pod reschedule is triggered during/after instance group deletion. + waitStatus := make(chan error) + go func() { + waitStatus <- waitForStatefulSetReplicasNotReady(statefulSet.Name, ns, c) + }() + + cloud, err := framework.GetGCECloud() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + instanceGroupName := framework.TestContext.CloudConfig.NodeInstanceGroup + instanceGroup, err := cloud.GetInstanceGroup(instanceGroupName, podZone) + Expect(err).NotTo(HaveOccurred(), + "Error getting instance group %s in zone %s", instanceGroupName, podZone) + err = framework.DeleteManagedInstanceGroup(podZone) + Expect(err).NotTo(HaveOccurred(), + "Error deleting instance group in zone %s", podZone) + + defer func() { + framework.Logf("recreating instance group %s", instanceGroup.Name) + + // HACK improve this when Managed Instance Groups are available through the cloud provider API + templateName := strings.Replace(instanceGroupName, "group", "template", 1 /* n */) + + framework.ExpectNoError(framework.CreateManagedInstanceGroup(instanceGroup.Size, podZone, templateName), + "Error recreating instance group %s in zone %s", instanceGroup.Name, podZone) + framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount, framework.RestartNodeReadyAgainTimeout), + "Error waiting for nodes from the new instance group to become ready.") + }() + + err = <-waitStatus + Expect(err).ToNot(HaveOccurred(), "Error waiting for replica to be deleted during failover: %v", err) + + err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout) + if err != nil { + pod := getPod(c, ns, regionalPDLabels) + Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(BeTrue(), + "The statefulset pod has the following conditions: %s", pod.Status.Conditions) + Expect(err).NotTo(HaveOccurred()) + } + + By("verifying the same PVC is used by the new pod") + Expect(getPVC(c, ns, regionalPDLabels).Name).To(Equal(pvc.Name), + "The same PVC should be used after failover.") + + By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.") + pod = getPod(c, ns, regionalPDLabels) + logs, err := framework.GetPodLogs(c, ns, pod.Name, "") + Expect(err).NotTo(HaveOccurred(), + "Error getting logs from pod %s in namespace %s", pod.Name, ns) + lineCount := len(strings.Split(strings.TrimSpace(logs), "\n")) + expectedLineCount := 2 + Expect(lineCount).To(Equal(expectedLineCount), + "Line count of the written file should be %d.", expectedLineCount) + + // Verify the pod is scheduled in the other zone. + By("verifying the pod is scheduled in a different zone.") + var otherZone string + if cloudZones[0] == podZone { + otherZone = cloudZones[1] + } else { + otherZone = cloudZones[0] + } + nodeName = pod.Spec.NodeName + node, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + newPodZone := node.Labels[apis.LabelZoneFailureDomain] + Expect(newPodZone).To(Equal(otherZone), + "The pod should be scheduled in zone %s after all nodes in zone %s have been deleted", otherZone, podZone) + +} + +func getPVC(c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim { + selector := labels.Set(pvcLabels).AsSelector() + options := metav1.ListOptions{LabelSelector: selector.String()} + pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(options) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pvcList.Items)).To(Equal(1), "There should be exactly 1 PVC matched.") + + return &pvcList.Items[0] +} + +func getPod(c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod { + selector := labels.Set(podLabels).AsSelector() + options := metav1.ListOptions{LabelSelector: selector.String()} + podList, err := c.CoreV1().Pods(ns).List(options) + Expect(err).NotTo(HaveOccurred()) + Expect(len(podList.Items)).To(Equal(1), "There should be exactly 1 pod matched.") + + return &podList.Items[0] +} + +// Generates the spec of a StatefulSet with 1 replica that mounts a Regional PD. +func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) { + var replicas int32 = 1 + labels = map[string]string{"app": "regional-pd-workload"} + + svc = &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "regional-pd-service", + Namespace: ns, + Labels: labels, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Port: 80, + Name: "web", + }}, + ClusterIP: v1.ClusterIPNone, + Selector: labels, + }, + } + + sts = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "regional-pd-sts", + Namespace: ns, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + ServiceName: svc.Name, + Replicas: &replicas, + Template: *newPodTemplate(labels), + VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate}, + }, + } + + return +} + +func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec { + return &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + // This container writes its pod name to a file in the Regional PD + // and prints the entire file to stdout. + { + Name: "busybox", + Image: "gcr.io/google_containers/busybox", + Command: []string{"sh", "-c"}, + Args: []string{ + "echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" + + "cat /mnt/data/regional-pd/pods.txt;" + + "sleep 3600;", + }, + Env: []v1.EnvVar{{ + Name: "POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }}, + Ports: []v1.ContainerPort{{ + ContainerPort: 80, + Name: "web", + }}, + VolumeMounts: []v1.VolumeMount{{ + Name: "regional-pd-vol", + MountPath: "/mnt/data/regional-pd", + }}, + }, + }, + }, + } +} + +func newClaimTemplate(ns string) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "regional-pd-vol", + Namespace: ns, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), + }, + }, + }, + } +} + +func newRegionalStorageClass(namespace string, zones []string) *storage.StorageClass { + return &storage.StorageClass{ + TypeMeta: metav1.TypeMeta{ + Kind: "StorageClass", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: namespace + "-sc", + }, + Provisioner: "kubernetes.io/gce-pd", + Parameters: map[string]string{ + "type": "pd-standard", + "zones": strings.Join(zones, ","), + "replication-type": "regional-pd", + }, + } +} + +func getTwoRandomZones(c clientset.Interface) []string { + zones, err := framework.GetClusterZones(c) + Expect(err).ToNot(HaveOccurred()) + Expect(zones.Len()).To(BeNumerically(">=", 2), + "The test should only be run in multizone clusters.") + + zone1, _ := zones.PopAny() + zone2, _ := zones.PopAny() + return []string{zone1, zone2} +} + +// Waits for at least 1 replica of a StatefulSet to become not ready or until timeout occurs, whichever comes first. +func waitForStatefulSetReplicasNotReady(statefulSetName, ns string, c clientset.Interface) error { + const poll = 3 * time.Second + const timeout = statefulSetReadyTimeout + + framework.Logf("Waiting up to %v for StatefulSet %s to have at least 1 replica to become not ready", timeout, statefulSetName) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{}) + if err != nil { + framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, poll, err) + continue + } else { + if sts.Status.ReadyReplicas < *sts.Spec.Replicas { + framework.Logf("%d replicas are ready out of a total of %d replicas in StatefulSet %s. (%v)", + sts.Status.ReadyReplicas, *sts.Spec.Replicas, statefulSetName, time.Since(start)) + return nil + } else { + framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas) + } + } + } + return fmt.Errorf("All replicas in StatefulSet %s are still ready within %v", statefulSetName, timeout) +} + +// If match is true, check if zones in PV exactly match zones given. +// Otherwise, check whether zones in PV is superset of zones given. +func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error { + pvZones, err := util.LabelZonesToSet(volume.Labels[apis.LabelZoneFailureDomain]) + if err != nil { + return err + } + + if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) { + return nil + } + + return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones) + +} diff --git a/test/e2e/storage/volume_provisioning.go b/test/e2e/storage/volume_provisioning.go index dc621a0bb20..4193d86b375 100644 --- a/test/e2e/storage/volume_provisioning.go +++ b/test/e2e/storage/volume_provisioning.go @@ -43,7 +43,6 @@ import ( clientset "k8s.io/client-go/kubernetes" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" storageutil "k8s.io/kubernetes/pkg/apis/storage/v1/util" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/storage/utils" ) @@ -1019,16 +1018,8 @@ func deleteProvisionedVolumesAndDisks(c clientset.Interface, pvs []*v1.Persisten } func getRandomCloudZone(c clientset.Interface) string { - nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - - // collect values of zone label from all nodes - zones := sets.NewString() - for _, node := range nodes.Items { - if zone, found := node.Labels[kubeletapis.LabelZoneFailureDomain]; found { - zones.Insert(zone) - } - } + zones, err := framework.GetClusterZones(c) + Expect(err).ToNot(HaveOccurred()) // return "" in case that no node has zone label zone, _ := zones.PopAny() return zone