Merge pull request #85149 from oomichi/move-util-4

Move functions from e2e/framework/util.go Part-4
This commit is contained in:
Kubernetes Prow Robot
2019-11-13 20:01:58 -08:00
committed by GitHub
9 changed files with 242 additions and 232 deletions

View File

@@ -57,7 +57,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
@@ -252,40 +251,6 @@ func NodeOSDistroIs(supportedNodeOsDistros ...string) bool {
return false
}
// WaitForDaemonSets for all daemonsets in the given namespace to be ready
// (defined as all but 'allowedNotReadyNodes' pods associated with that
// daemonset are ready).
func WaitForDaemonSets(c clientset.Interface, ns string, allowedNotReadyNodes int32, timeout time.Duration) error {
start := time.Now()
Logf("Waiting up to %v for all daemonsets in namespace '%s' to start",
timeout, ns)
return wait.PollImmediate(Poll, timeout, func() (bool, error) {
dsList, err := c.AppsV1().DaemonSets(ns).List(metav1.ListOptions{})
if err != nil {
Logf("Error getting daemonsets in namespace: '%s': %v", ns, err)
if testutils.IsRetryableAPIError(err) {
return false, nil
}
return false, err
}
var notReadyDaemonSets []string
for _, ds := range dsList.Items {
Logf("%d / %d pods ready in namespace '%s' in daemonset '%s' (%d seconds elapsed)", ds.Status.NumberReady, ds.Status.DesiredNumberScheduled, ns, ds.ObjectMeta.Name, int(time.Since(start).Seconds()))
if ds.Status.DesiredNumberScheduled-ds.Status.NumberReady > allowedNotReadyNodes {
notReadyDaemonSets = append(notReadyDaemonSets, ds.ObjectMeta.Name)
}
}
if len(notReadyDaemonSets) > 0 {
Logf("there are not ready daemonsets: %v", notReadyDaemonSets)
return false, nil
}
return true, nil
})
}
func kubectlLogPod(c clientset.Interface, pod v1.Pod, containerNameSubstr string, logFunc func(ftm string, args ...interface{})) {
for _, container := range pod.Spec.Containers {
if strings.Contains(container.Name, containerNameSubstr) {
@@ -402,24 +367,6 @@ func WaitForDefaultServiceAccountInNamespace(c clientset.Interface, namespace st
return waitForServiceAccountInNamespace(c, namespace, "default", ServiceAccountProvisionTimeout)
}
// WaitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first.
func WaitForStatefulSetReplicasReady(statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{})
if err != nil {
Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
continue
}
if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
return nil
}
Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
}
return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
}
// WaitForPersistentVolumeDeleted waits for a PersistentVolume to get deleted or until timeout occurs, whichever comes first.
func WaitForPersistentVolumeDeleted(c clientset.Interface, pvName string, Poll, timeout time.Duration) error {
Logf("Waiting up to %v for PersistentVolume %s to get deleted", timeout, pvName)
@@ -545,123 +492,6 @@ func CheckTestingNSDeletedExcept(c clientset.Interface, skip string) error {
return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out")
}
// ContainerInitInvariant checks for an init containers are initialized and invariant on both older and newer.
func ContainerInitInvariant(older, newer runtime.Object) error {
oldPod := older.(*v1.Pod)
newPod := newer.(*v1.Pod)
if len(oldPod.Spec.InitContainers) == 0 {
return nil
}
if len(oldPod.Spec.InitContainers) != len(newPod.Spec.InitContainers) {
return fmt.Errorf("init container list changed")
}
if oldPod.UID != newPod.UID {
return fmt.Errorf("two different pods exist in the condition: %s vs %s", oldPod.UID, newPod.UID)
}
if err := initContainersInvariants(oldPod); err != nil {
return err
}
if err := initContainersInvariants(newPod); err != nil {
return err
}
oldInit, _, _ := initialized(oldPod)
newInit, _, _ := initialized(newPod)
if oldInit && !newInit {
// TODO: we may in the future enable resetting initialized = false if the kubelet needs to restart it
// from scratch
return fmt.Errorf("pod cannot be initialized and then regress to not being initialized")
}
return nil
}
func initContainersInvariants(pod *v1.Pod) error {
allInit, initFailed, err := initialized(pod)
if err != nil {
return err
}
if !allInit || initFailed {
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting == nil || s.RestartCount != 0 {
return fmt.Errorf("container %s is not waiting but initialization not complete", s.Name)
}
if s.State.Waiting.Reason != "PodInitializing" {
return fmt.Errorf("container %s should have reason PodInitializing: %s", s.Name, s.State.Waiting.Reason)
}
}
}
_, c := podutil.GetPodCondition(&pod.Status, v1.PodInitialized)
if c == nil {
return fmt.Errorf("pod does not have initialized condition")
}
if c.LastTransitionTime.IsZero() {
return fmt.Errorf("PodInitialized condition should always have a transition time")
}
switch {
case c.Status == v1.ConditionUnknown:
return fmt.Errorf("PodInitialized condition should never be Unknown")
case c.Status == v1.ConditionTrue && (initFailed || !allInit):
return fmt.Errorf("PodInitialized condition was True but all not all containers initialized")
case c.Status == v1.ConditionFalse && (!initFailed && allInit):
return fmt.Errorf("PodInitialized condition was False but all containers initialized")
}
return nil
}
// InvariantFunc is a func that checks for invariant.
type InvariantFunc func(older, newer runtime.Object) error
// CheckInvariants checks for invariant of the each events.
func CheckInvariants(events []watch.Event, fns ...InvariantFunc) error {
errs := sets.NewString()
for i := range events {
j := i + 1
if j >= len(events) {
continue
}
for _, fn := range fns {
if err := fn(events[i].Object, events[j].Object); err != nil {
errs.Insert(err.Error())
}
}
}
if errs.Len() > 0 {
return fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* "))
}
return nil
}
// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
func WaitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.Duration) error {
options := metav1.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}.AsSelector().String()}
w, err := c.CoreV1().ReplicationControllers(ns).Watch(options)
if err != nil {
return err
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "")
}
switch rc := event.Object.(type) {
case *v1.ReplicationController:
if rc.Name == name && rc.Namespace == ns &&
rc.Generation <= rc.Status.ObservedGeneration &&
*(rc.Spec.Replicas) == rc.Status.Replicas {
return true, nil
}
Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, rc.Generation, rc.Status.ObservedGeneration, *(rc.Spec.Replicas), rc.Status.Replicas)
}
return false, nil
})
return err
}
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
func WaitForService(c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
@@ -688,33 +518,6 @@ func WaitForService(c clientset.Interface, namespace, name string, exist bool, i
return nil
}
// WaitForServiceWithSelector waits until any service with given selector appears (exist == true), or disappears (exist == false)
func WaitForServiceWithSelector(c clientset.Interface, namespace string, selector labels.Selector, exist bool, interval,
timeout time.Duration) error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
services, err := c.CoreV1().Services(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
switch {
case len(services.Items) != 0:
Logf("Service with %s in namespace %s found.", selector.String(), namespace)
return exist, nil
case len(services.Items) == 0:
Logf("Service with %s in namespace %s disappeared.", selector.String(), namespace)
return !exist, nil
case !testutils.IsRetryableAPIError(err):
Logf("Non-retryable failure while listing service.")
return false, err
default:
Logf("List service with %s in namespace %s failed: %v", selector.String(), namespace, err)
return false, nil
}
})
if err != nil {
stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
return fmt.Errorf("error waiting for service with %s in namespace %s %s: %v", selector.String(), namespace, stateMsg[exist], err)
}
return nil
}
//WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
func WaitForServiceEndpointsNum(c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.Poll(interval, timeout, func() (bool, error) {
@@ -1159,8 +962,8 @@ func (f *Framework) MatchContainerOutput(
// EventsLister is a func that lists events.
type EventsLister func(opts metav1.ListOptions, ns string) (*v1.EventList, error)
// DumpEventsInNamespace dumps events in the given namespace.
func DumpEventsInNamespace(eventsLister EventsLister, namespace string) {
// dumpEventsInNamespace dumps events in the given namespace.
func dumpEventsInNamespace(eventsLister EventsLister, namespace string) {
ginkgo.By(fmt.Sprintf("Collecting events from namespace %q.", namespace))
events, err := eventsLister(metav1.ListOptions{}, namespace)
ExpectNoError(err, "failed to list events in namespace %q", namespace)
@@ -1181,7 +984,7 @@ func DumpEventsInNamespace(eventsLister EventsLister, namespace string) {
// DumpAllNamespaceInfo dumps events, pods and nodes information in the given namespace.
func DumpAllNamespaceInfo(c clientset.Interface, namespace string) {
DumpEventsInNamespace(func(opts metav1.ListOptions, ns string) (*v1.EventList, error) {
dumpEventsInNamespace(func(opts metav1.ListOptions, ns string) (*v1.EventList, error) {
return c.CoreV1().Events(ns).List(opts)
}, namespace)
@@ -2223,25 +2026,3 @@ func PrettyPrintJSON(metrics interface{}) string {
}
return string(formatted.Bytes())
}
// initialized checks the state of all init containers in the pod.
func initialized(pod *v1.Pod) (ok bool, failed bool, err error) {
allInit := true
initFailed := false
for _, s := range pod.Status.InitContainerStatuses {
switch {
case initFailed && s.State.Waiting == nil:
return allInit, initFailed, fmt.Errorf("container %s is after a failed container but isn't waiting", s.Name)
case allInit && s.State.Waiting == nil:
return allInit, initFailed, fmt.Errorf("container %s is after an initializing container but isn't waiting", s.Name)
case s.State.Terminated == nil:
allInit = false
case s.State.Terminated.ExitCode != 0:
allInit = false
initFailed = true
case !s.Ready:
return allInit, initFailed, fmt.Errorf("container %s initialized but isn't marked as ready", s.Name)
}
}
return allInit, initFailed, nil
}