feat: move service_util to separated package

This commit is contained in:
draveness
2019-07-02 00:55:04 +08:00
parent 3f1cb97f9a
commit c38ae01f8e
50 changed files with 1339 additions and 1076 deletions

View File

@@ -24,7 +24,7 @@ import (
"strconv"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
@@ -38,19 +38,19 @@ import (
// NewStatefulSet creates a new Webserver StatefulSet for testing. The StatefulSet is named name, is in namespace ns,
// statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly
// to the Pod. labels are the labels that will be usd for the StatefulSet selector.
func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []corev1.VolumeMount, podMounts []corev1.VolumeMount, labels map[string]string) *appsv1.StatefulSet {
func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *appsv1.StatefulSet {
mounts := append(statefulPodMounts, podMounts...)
claims := []corev1.PersistentVolumeClaim{}
claims := []v1.PersistentVolumeClaim{}
for _, m := range statefulPodMounts {
claims = append(claims, NewStatefulSetPVC(m.Name))
}
vols := []corev1.Volume{}
vols := []v1.Volume{}
for _, m := range podMounts {
vols = append(vols, corev1.Volume{
vols = append(vols, v1.Volume{
Name: m.Name,
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: fmt.Sprintf("/tmp/%v", m.Name),
},
},
@@ -71,13 +71,13 @@ func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulP
MatchLabels: labels,
},
Replicas: func(i int32) *int32 { return &i }(replicas),
Template: corev1.PodTemplateSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "webserver",
Image: imageutils.GetE2EImage(imageutils.Httpd),
@@ -95,18 +95,18 @@ func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulP
}
// NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
func NewStatefulSetPVC(name string) corev1.PersistentVolumeClaim {
return corev1.PersistentVolumeClaim{
func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
},
},
},
@@ -114,17 +114,17 @@ func NewStatefulSetPVC(name string) corev1.PersistentVolumeClaim {
}
// CreateStatefulSetService creates a Headless Service with Name name and Selector set to match labels.
func CreateStatefulSetService(name string, labels map[string]string) *corev1.Service {
headlessService := &corev1.Service{
func CreateStatefulSetService(name string, labels map[string]string) *v1.Service {
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: corev1.ServiceSpec{
Spec: v1.ServiceSpec{
Selector: labels,
},
}
headlessService.Spec.Ports = []corev1.ServicePort{
{Port: 80, Name: "http", Protocol: corev1.ProtocolTCP},
headlessService.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
}
headlessService.Spec.ClusterIP = "None"
return headlessService
@@ -149,7 +149,7 @@ func BreakHTTPProbe(c clientset.Interface, ss *appsv1.StatefulSet) error {
}
// BreakPodHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in one pod.
func BreakPodHTTPProbe(ss *appsv1.StatefulSet, pod *corev1.Pod) error {
func BreakPodHTTPProbe(ss *appsv1.StatefulSet, pod *v1.Pod) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
@@ -173,7 +173,7 @@ func RestoreHTTPProbe(c clientset.Interface, ss *appsv1.StatefulSet) error {
}
// RestorePodHTTPProbe restores the readiness probe for Nginx StatefulSet containers in pod.
func RestorePodHTTPProbe(ss *appsv1.StatefulSet, pod *corev1.Pod) error {
func RestorePodHTTPProbe(ss *appsv1.StatefulSet, pod *v1.Pod) error {
path := httpProbe.HTTPGet.Path
if path == "" {
return fmt.Errorf("path expected to be not empty: %v", path)
@@ -185,14 +185,14 @@ func RestorePodHTTPProbe(ss *appsv1.StatefulSet, pod *corev1.Pod) error {
return err
}
func hasPauseProbe(pod *corev1.Pod) bool {
func hasPauseProbe(pod *v1.Pod) bool {
probe := pod.Spec.Containers[0].ReadinessProbe
return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
}
var httpProbe = &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
var httpProbe = &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80},
},
@@ -202,16 +202,16 @@ var httpProbe = &corev1.Probe{
FailureThreshold: 1,
}
var pauseProbe = &corev1.Probe{
Handler: corev1.Handler{
Exec: &corev1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},
var pauseProbe = &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 1,
}
type statefulPodsByOrdinal []corev1.Pod
type statefulPodsByOrdinal []v1.Pod
func (sp statefulPodsByOrdinal) Len() int {
return len(sp)
@@ -242,7 +242,7 @@ func ResumeNextPod(c clientset.Interface, ss *appsv1.StatefulSet) {
podList := GetPodList(c, ss)
resumedPod := ""
for _, pod := range podList.Items {
if pod.Status.Phase != corev1.PodRunning {
if pod.Status.Phase != v1.PodRunning {
e2elog.Failf("Found pod in phase %q, cannot resume", pod.Status.Phase)
}
if podutil.IsPodReady(&pod) || !hasPauseProbe(&pod) {
@@ -259,13 +259,13 @@ func ResumeNextPod(c clientset.Interface, ss *appsv1.StatefulSet) {
}
// SortStatefulPods sorts pods by their ordinals
func SortStatefulPods(pods *corev1.PodList) {
func SortStatefulPods(pods *v1.PodList) {
sort.Sort(statefulPodsByOrdinal(pods.Items))
}
var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
func getStatefulPodOrdinal(pod *corev1.Pod) int {
func getStatefulPodOrdinal(pod *v1.Pod) int {
ordinal := -1
subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
if len(subMatches) < 3 {

View File

@@ -23,7 +23,7 @@ import (
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -62,7 +62,7 @@ func CreateStatefulSet(c clientset.Interface, manifestPath, ns string) *appsv1.S
}
// GetPodList gets the current Pods in ss.
func GetPodList(c clientset.Interface, ss *appsv1.StatefulSet) *corev1.PodList {
func GetPodList(c clientset.Interface, ss *appsv1.StatefulSet) *v1.PodList {
selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
e2efwk.ExpectNoError(err)
podList, err := c.CoreV1().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
@@ -182,7 +182,7 @@ func Scale(c clientset.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.
e2elog.Logf("Scaling statefulset %s to %d", name, count)
ss = update(c, ns, name, func(ss *appsv1.StatefulSet) { *(ss.Spec.Replicas) = count })
var statefulPodList *corev1.PodList
var statefulPodList *v1.PodList
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
statefulPodList = GetPodList(c, ss)
if int32(len(statefulPodList.Items)) == count {
@@ -194,7 +194,7 @@ func Scale(c clientset.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.
unhealthy := []string{}
for _, statefulPod := range statefulPodList.Items {
delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, podutil.IsPodReady(&statefulPod)
if delTs != nil || phase != corev1.PodRunning || !readiness {
if delTs != nil || phase != v1.PodRunning || !readiness {
unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
}
}
@@ -313,7 +313,7 @@ func ExecInStatefulPods(c clientset.Interface, ss *appsv1.StatefulSet, cmd strin
type updateStatefulSetFunc func(*appsv1.StatefulSet)
// VerifyStatefulPodFunc is a func that examines a StatefulSetPod.
type VerifyStatefulPodFunc func(*corev1.Pod)
type VerifyStatefulPodFunc func(*v1.Pod)
// VerifyPodAtIndex applies a visitor pattern to the Pod at index in ss. verify is applied to the Pod to "visit" it.
func VerifyPodAtIndex(c clientset.Interface, index int, ss *appsv1.StatefulSet, verify VerifyStatefulPodFunc) {

View File

@@ -20,7 +20,7 @@ import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
@@ -32,8 +32,8 @@ import (
// a RollingUpdateStatefulSetStrategyType with a non-nil RollingUpdate and Partition. All Pods with ordinals less
// than or equal to the Partition are expected to be at set's current revision. All other Pods are expected to be
// at its update revision.
func WaitForPartitionedRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *corev1.PodList) {
var pods *corev1.PodList
func WaitForPartitionedRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
e2elog.Failf("StatefulSet %s/%s attempt to wait for partitioned update with updateStrategy %s",
set.Namespace,
@@ -45,7 +45,7 @@ func WaitForPartitionedRollingUpdate(c clientset.Interface, set *appsv1.Stateful
set.Namespace,
set.Name)
}
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *corev1.PodList) (bool, error) {
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
@@ -102,8 +102,8 @@ func WaitForRunning(c clientset.Interface, numPodsRunning, numPodsReady int32, s
shouldBeReady := getStatefulPodOrdinal(&p) < int(numPodsReady)
isReady := podutil.IsPodReady(&p)
desiredReadiness := shouldBeReady == isReady
e2elog.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, corev1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
if p.Status.Phase != corev1.PodRunning || !desiredReadiness {
e2elog.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
if p.Status.Phase != v1.PodRunning || !desiredReadiness {
return false, nil
}
}
@@ -115,7 +115,7 @@ func WaitForRunning(c clientset.Interface, numPodsRunning, numPodsReady int32, s
}
// WaitForState periodically polls for the ss and its pods until the until function returns either true or an error
func WaitForState(c clientset.Interface, ss *appsv1.StatefulSet, until func(*appsv1.StatefulSet, *corev1.PodList) (bool, error)) {
func WaitForState(c clientset.Interface, ss *appsv1.StatefulSet, until func(*appsv1.StatefulSet, *v1.PodList) (bool, error)) {
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
ssGet, err := c.AppsV1().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{})
@@ -133,7 +133,7 @@ func WaitForState(c clientset.Interface, ss *appsv1.StatefulSet, until func(*app
// WaitForStatus waits for the StatefulSetStatus's ObservedGeneration to be greater than or equal to set's Generation.
// The returned StatefulSet contains such a StatefulSetStatus
func WaitForStatus(c clientset.Interface, set *appsv1.StatefulSet) *appsv1.StatefulSet {
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods *corev1.PodList) (bool, error) {
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods *v1.PodList) (bool, error) {
if set2.Status.ObservedGeneration >= set.Generation {
set = set2
return true, nil
@@ -149,9 +149,9 @@ func WaitForRunningAndReady(c clientset.Interface, numStatefulPods int32, ss *ap
}
// WaitForPodReady waits for the Pod named podName in set to exist and have a Ready condition.
func WaitForPodReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *corev1.PodList) {
var pods *corev1.PodList
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *corev1.PodList) (bool, error) {
func WaitForPodReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
for i := range pods.Items {
@@ -165,9 +165,9 @@ func WaitForPodReady(c clientset.Interface, set *appsv1.StatefulSet, podName str
}
// WaitForPodNotReady waits for the Pod named podName in set to exist and to not have a Ready condition.
func WaitForPodNotReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *corev1.PodList) {
var pods *corev1.PodList
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *corev1.PodList) (bool, error) {
func WaitForPodNotReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
for i := range pods.Items {
@@ -182,15 +182,15 @@ func WaitForPodNotReady(c clientset.Interface, set *appsv1.StatefulSet, podName
// WaitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
// complete. set must have a RollingUpdateStatefulSetStrategyType.
func WaitForRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *corev1.PodList) {
var pods *corev1.PodList
func WaitForRollingUpdate(c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
e2elog.Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *corev1.PodList) (bool, error) {
WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
if len(pods.Items) < int(*set.Spec.Replicas) {