Move resource methods from e2e fw to e2e resource fw

This commit is contained in:
drfish 2020-02-13 15:31:39 +08:00
parent 71ad0a9020
commit f4da086cbe
19 changed files with 248 additions and 192 deletions

View File

@ -74,6 +74,7 @@ go_library(
"//test/e2e/framework/pv:go_default_library",
"//test/e2e/framework/rc:go_default_library",
"//test/e2e/framework/replicaset:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/e2e/framework/service:go_default_library",
"//test/e2e/framework/skipper:go_default_library",
"//test/e2e/framework/ssh:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/test/e2e/framework"
e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
imageutils "k8s.io/kubernetes/test/utils/image"
)
@ -211,7 +212,7 @@ var _ = SIGDescribe("CronJob", func() {
ginkgo.By("Deleting the job")
job := cronJob.Status.Active[0]
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
ginkgo.By("Ensuring job was deleted")
_, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name)

View File

@ -39,6 +39,7 @@ import (
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
testutils "k8s.io/kubernetes/test/utils"
"github.com/onsi/ginkgo"
@ -104,7 +105,7 @@ var _ = SIGDescribe("Daemon set [Serial]", func() {
if daemonsets != nil && len(daemonsets.Items) > 0 {
for _, ds := range daemonsets.Items {
ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds))
framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
}

View File

@ -44,6 +44,7 @@ import (
e2edeploy "k8s.io/kubernetes/test/e2e/framework/deployment"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/framework/replicaset"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
testutil "k8s.io/kubernetes/test/utils"
@ -193,7 +194,7 @@ func stopDeployment(c clientset.Interface, ns, deploymentName string) {
framework.ExpectNoError(err)
framework.Logf("Deleting deployment %s", deploymentName)
err = framework.DeleteResourceAndWaitForGC(c, appsinternal.Kind("Deployment"), ns, deployment.Name)
err = e2eresource.DeleteResourceAndWaitForGC(c, appsinternal.Kind("Deployment"), ns, deployment.Name)
framework.ExpectNoError(err)
framework.Logf("Ensuring deployment %s was deleted", deploymentName)

View File

@ -32,6 +32,7 @@ import (
e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
@ -82,7 +83,7 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
ginkgo.By("Delete the job")
err = framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name)
err = e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to delete the job in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensure the pods associated with the job are also deleted")
@ -162,7 +163,7 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
ginkgo.By("delete a job")
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
ginkgo.By("Ensuring job was deleted")
_, err = e2ejob.GetJob(f.ClientSet, f.Namespace.Name, job.Name)

View File

@ -70,7 +70,6 @@ go_library(
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/framework/node:go_default_library",
"//test/e2e/framework/pod:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/e2e/framework/ssh:go_default_library",
"//test/e2e/system:go_default_library",
"//test/utils:go_default_library",

View File

@ -17,6 +17,7 @@ go_library(
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/kubectl:go_default_library",
"//test/e2e/framework/rc:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/e2e/framework/service:go_default_library",
"//test/utils:go_default_library",
"//test/utils/image:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
testutils "k8s.io/kubernetes/test/utils"
@ -414,9 +415,9 @@ func (rc *ResourceConsumer) CleanUp() {
// Wait some time to ensure all child goroutines are finished.
time.Sleep(10 * time.Second)
kind := rc.kind.GroupKind()
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(rc.clientSet, kind, rc.nsName, rc.name))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(rc.clientSet, kind, rc.nsName, rc.name))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(context.TODO(), rc.name, metav1.DeleteOptions{}))
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(rc.clientSet, schema.GroupKind{Kind: "ReplicationController"}, rc.nsName, rc.controllerName))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(rc.clientSet, schema.GroupKind{Kind: "ReplicationController"}, rc.nsName, rc.controllerName))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(context.TODO(), rc.controllerName, metav1.DeleteOptions{}))
}

View File

@ -20,13 +20,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/utils:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",

View File

@ -31,13 +31,11 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
testutils "k8s.io/kubernetes/test/utils"
)
@ -428,40 +426,6 @@ func PodsResponding(c clientset.Interface, ns, name string, wantName bool, pods
return wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses)
}
// WaitForControlledPodsRunning waits up to 10 minutes for pods to become Running.
func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind schema.GroupKind) error {
rtObject, err := e2eresource.GetRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return err
}
selector, err := e2eresource.GetSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
replicas, err := e2eresource.GetReplicasFromRuntimeObject(rtObject)
if err != nil {
return err
}
err = testutils.WaitForEnoughPodsWithLabelRunning(c, ns, selector, int(replicas))
if err != nil {
return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", name, err)
}
return nil
}
// WaitForControlledPods waits up to podListTimeout for getting pods of the specified controller name and return them.
func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) {
rtObject, err := e2eresource.GetRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return nil, err
}
selector, err := e2eresource.GetSelectorFromRuntimeObject(rtObject)
if err != nil {
return nil, err
}
return WaitForPodsWithLabel(c, ns, selector)
}
// WaitForPodsWithLabelScheduled waits for all matching pods to become scheduled and at least one
// matching pod exists. Return the list of matching pods.
func WaitForPodsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) {

View File

@ -13,6 +13,7 @@ go_library(
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/kubectl:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/utils:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
],

View File

@ -28,6 +28,7 @@ import (
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
testutils "k8s.io/kubernetes/test/utils"
)
@ -70,12 +71,12 @@ func ByNameContainer(name string, replicas int32, labels map[string]string, c v1
// DeleteRCAndWaitForGC deletes only the Replication Controller and waits for GC to delete the pods.
func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error {
return framework.DeleteResourceAndWaitForGC(c, schema.GroupKind{Kind: "ReplicationController"}, ns, name)
return e2eresource.DeleteResourceAndWaitForGC(c, schema.GroupKind{Kind: "ReplicationController"}, ns, name)
}
// ScaleRC scales Replication Controller to be desired size.
func ScaleRC(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return framework.ScaleResource(clientset, scalesGetter, ns, name, size, wait, schema.GroupKind{Kind: "ReplicationController"}, v1.SchemeGroupVersion.WithResource("replicationcontrollers"))
return e2eresource.ScaleResource(clientset, scalesGetter, ns, name, size, wait, schema.GroupKind{Kind: "ReplicationController"}, v1.SchemeGroupVersion.WithResource("replicationcontrollers"))
}
// RunRC Launches (and verifies correctness) of a Replication Controller

View File

@ -2,19 +2,31 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["runtimeobj.go"],
srcs = [
"resources.go",
"runtimeobj.go",
],
importpath = "k8s.io/kubernetes/test/e2e/framework/resource",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/extensions/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/pod:go_default_library",
"//test/utils:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
],
)

View File

@ -0,0 +1,210 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resource
import (
"fmt"
"time"
"github.com/onsi/ginkgo"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
testutils "k8s.io/kubernetes/test/utils"
)
const (
// Number of objects that gc can delete in a second.
// GC issues 2 requestes for single delete.
gcThroughput = 10
)
// ScaleResource scales resource to the given size.
func ScaleResource(
clientset clientset.Interface,
scalesGetter scaleclient.ScalesGetter,
ns, name string,
size uint,
wait bool,
kind schema.GroupKind,
gvr schema.GroupVersionResource,
) error {
ginkgo.By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gvr); err != nil {
return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
}
if !wait {
return nil
}
return WaitForControlledPodsRunning(clientset, ns, name, kind)
}
// DeleteResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns, name string) error {
ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns))
rtObject, err := GetRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
if apierrors.IsNotFound(err) {
framework.Logf("%v %s not found: %v", kind, name, err)
return nil
}
return err
}
selector, err := GetSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
replicas, err := GetReplicasFromRuntimeObject(rtObject)
if err != nil {
return err
}
ps, err := testutils.NewPodStore(c, ns, selector, fields.Everything())
if err != nil {
return err
}
defer ps.Stop()
falseVar := false
deleteOption := metav1.DeleteOptions{OrphanDependents: &falseVar}
startTime := time.Now()
if err := testutils.DeleteResourceWithRetries(c, kind, ns, name, deleteOption); err != nil {
return err
}
deleteTime := time.Since(startTime)
framework.Logf("Deleting %v %s took: %v", kind, name, deleteTime)
var interval, timeout time.Duration
switch {
case replicas < 100:
interval = 100 * time.Millisecond
case replicas < 1000:
interval = 1 * time.Second
default:
interval = 10 * time.Second
}
if replicas < 5000 {
timeout = 10 * time.Minute
} else {
timeout = time.Duration(replicas/gcThroughput) * time.Second
// gcThroughput is pretty strict now, add a bit more to it
timeout = timeout + 3*time.Minute
}
err = waitForPodsInactive(ps, interval, timeout)
if err != nil {
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Since(startTime) - deleteTime
framework.Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime)
// In gce, at any point, small percentage of nodes can disappear for
// ~10 minutes due to hostError. 20 minutes should be long enough to
// restart VM in that case and delete the pod.
err = waitForPodsGone(ps, interval, 20*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
return nil
}
// waitForPodsGone waits until there are no pods left in the PodStore.
func waitForPodsGone(ps *testutils.PodStore, interval, timeout time.Duration) error {
var pods []*v1.Pod
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
if pods = ps.List(); len(pods) == 0 {
return true, nil
}
return false, nil
})
if err == wait.ErrWaitTimeout {
for _, pod := range pods {
framework.Logf("ERROR: Pod %q still exists. Node: %q", pod.Name, pod.Spec.NodeName)
}
return fmt.Errorf("there are %d pods left. E.g. %q on node %q", len(pods), pods[0].Name, pods[0].Spec.NodeName)
}
return err
}
// waitForPodsInactive waits until there are no active pods left in the PodStore.
// This is to make a fair comparison of deletion time between DeleteRCAndPods
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
// when the pod is inactvie.
func waitForPodsInactive(ps *testutils.PodStore, interval, timeout time.Duration) error {
var activePods []*v1.Pod
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pods := ps.List()
activePods = controller.FilterActivePods(pods)
if len(activePods) != 0 {
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
for _, pod := range activePods {
framework.Logf("ERROR: Pod %q running on %q is still active", pod.Name, pod.Spec.NodeName)
}
return fmt.Errorf("there are %d active pods. E.g. %q on node %q", len(activePods), activePods[0].Name, activePods[0].Spec.NodeName)
}
return err
}
// WaitForControlledPodsRunning waits up to 10 minutes for pods to become Running.
func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind schema.GroupKind) error {
rtObject, err := GetRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return err
}
selector, err := GetSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
replicas, err := GetReplicasFromRuntimeObject(rtObject)
if err != nil {
return err
}
err = testutils.WaitForEnoughPodsWithLabelRunning(c, ns, selector, int(replicas))
if err != nil {
return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", name, err)
}
return nil
}
// WaitForControlledPods waits up to podListTimeout for getting pods of the specified controller name and return them.
func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) {
rtObject, err := GetRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return nil, err
}
selector, err := GetSelectorFromRuntimeObject(rtObject)
if err != nil {
return nil, err
}
return e2epod.WaitForPodsWithLabel(c, ns, selector)
}

View File

@ -52,7 +52,6 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
@ -61,7 +60,6 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
watchtools "k8s.io/client-go/tools/watch"
@ -79,7 +77,6 @@ import (
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
)
@ -146,10 +143,6 @@ const (
// SnapshotCreateTimeout is how long for snapshot to create snapshotContent.
SnapshotCreateTimeout = 5 * time.Minute
// Number of objects that gc can delete in a second.
// GC issues 2 requestes for single delete.
gcThroughput = 10
// Minimal number of nodes for the cluster to be considered large.
largeClusterThreshold = 100
@ -1133,139 +1126,6 @@ func NodeHasTaint(c clientset.Interface, nodeName string, taint *v1.Taint) (bool
return true, nil
}
// ScaleResource scales resource to the given size.
func ScaleResource(
clientset clientset.Interface,
scalesGetter scaleclient.ScalesGetter,
ns, name string,
size uint,
wait bool,
kind schema.GroupKind,
gvr schema.GroupVersionResource,
) error {
ginkgo.By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gvr); err != nil {
return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
}
if !wait {
return nil
}
return e2epod.WaitForControlledPodsRunning(clientset, ns, name, kind)
}
// DeleteResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns, name string) error {
ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns))
rtObject, err := e2eresource.GetRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
if apierrors.IsNotFound(err) {
Logf("%v %s not found: %v", kind, name, err)
return nil
}
return err
}
selector, err := e2eresource.GetSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
replicas, err := e2eresource.GetReplicasFromRuntimeObject(rtObject)
if err != nil {
return err
}
ps, err := testutils.NewPodStore(c, ns, selector, fields.Everything())
if err != nil {
return err
}
defer ps.Stop()
falseVar := false
deleteOption := metav1.DeleteOptions{OrphanDependents: &falseVar}
startTime := time.Now()
if err := testutils.DeleteResourceWithRetries(c, kind, ns, name, deleteOption); err != nil {
return err
}
deleteTime := time.Since(startTime)
Logf("Deleting %v %s took: %v", kind, name, deleteTime)
var interval, timeout time.Duration
switch {
case replicas < 100:
interval = 100 * time.Millisecond
case replicas < 1000:
interval = 1 * time.Second
default:
interval = 10 * time.Second
}
if replicas < 5000 {
timeout = 10 * time.Minute
} else {
timeout = time.Duration(replicas/gcThroughput) * time.Second
// gcThroughput is pretty strict now, add a bit more to it
timeout = timeout + 3*time.Minute
}
err = waitForPodsInactive(ps, interval, timeout)
if err != nil {
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Since(startTime) - deleteTime
Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime)
// In gce, at any point, small percentage of nodes can disappear for
// ~10 minutes due to hostError. 20 minutes should be long enough to
// restart VM in that case and delete the pod.
err = waitForPodsGone(ps, interval, 20*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
return nil
}
// waitForPodsGone waits until there are no pods left in the PodStore.
func waitForPodsGone(ps *testutils.PodStore, interval, timeout time.Duration) error {
var pods []*v1.Pod
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
if pods = ps.List(); len(pods) == 0 {
return true, nil
}
return false, nil
})
if err == wait.ErrWaitTimeout {
for _, pod := range pods {
Logf("ERROR: Pod %q still exists. Node: %q", pod.Name, pod.Spec.NodeName)
}
return fmt.Errorf("there are %d pods left. E.g. %q on node %q", len(pods), pods[0].Name, pods[0].Spec.NodeName)
}
return err
}
// waitForPodsInactive waits until there are no active pods left in the PodStore.
// This is to make a fair comparison of deletion time between DeleteRCAndPods
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
// when the pod is inactvie.
func waitForPodsInactive(ps *testutils.PodStore, interval, timeout time.Duration) error {
var activePods []*v1.Pod
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pods := ps.List()
activePods = controller.FilterActivePods(pods)
if len(activePods) != 0 {
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
for _, pod := range activePods {
Logf("ERROR: Pod %q running on %q is still active", pod.Name, pod.Spec.NodeName)
}
return fmt.Errorf("there are %d active pods. E.g. %q on node %q", len(activePods), activePods[0].Name, activePods[0].Spec.NodeName)
}
return err
}
// RunHostCmd runs the given cmd in the context of the given pod using `kubectl exec`
// inside of a shell.
func RunHostCmd(ns, name, cmd string) (string, error) {

View File

@ -75,6 +75,7 @@ go_library(
"//test/e2e/framework/pod:go_default_library",
"//test/e2e/framework/providers/gce:go_default_library",
"//test/e2e/framework/rc:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/e2e/framework/service:go_default_library",
"//test/e2e/framework/skipper:go_default_library",
"//test/e2e/framework/ssh:go_default_library",

View File

@ -35,6 +35,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
)
@ -102,7 +103,7 @@ var _ = SIGDescribe("ClusterDns [Feature:Example]", func() {
// wait for objects
for _, ns := range namespaces {
e2epod.WaitForControlledPodsRunning(c, ns.Name, backendRcName, api.Kind("ReplicationController"))
e2eresource.WaitForControlledPodsRunning(c, ns.Name, backendRcName, api.Kind("ReplicationController"))
framework.WaitForService(c, ns.Name, backendSvcName, true, framework.Poll, framework.ServiceStartTimeout)
}
// it is not enough that pods are running because they may be set to running, but

View File

@ -50,6 +50,7 @@ go_library(
"//test/e2e/framework/pv:go_default_library",
"//test/e2e/framework/rc:go_default_library",
"//test/e2e/framework/replicaset:go_default_library",
"//test/e2e/framework/resource:go_default_library",
"//test/e2e/framework/service:go_default_library",
"//test/e2e/framework/skipper:go_default_library",
"//test/utils:go_default_library",

View File

@ -33,6 +33,7 @@ import (
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/framework/providers/gce"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -143,10 +144,10 @@ func SetupNVIDIAGPUNode(f *framework.Framework, setupResourceGatherer bool) *fra
framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset")
framework.Logf("Successfully created daemonset to install Nvidia drivers.")
pods, err := e2epod.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet"))
pods, err := e2eresource.WaitForControlledPods(f.ClientSet, ds.Namespace, ds.Name, extensionsinternal.Kind("DaemonSet"))
framework.ExpectNoError(err, "failed to get pods controlled by the nvidia-driver-installer daemonset")
devicepluginPods, err := e2epod.WaitForControlledPods(f.ClientSet, "kube-system", "nvidia-gpu-device-plugin", extensionsinternal.Kind("DaemonSet"))
devicepluginPods, err := e2eresource.WaitForControlledPods(f.ClientSet, "kube-system", "nvidia-gpu-device-plugin", extensionsinternal.Kind("DaemonSet"))
if err == nil {
framework.Logf("Adding deviceplugin addon pod.")
pods.Items = append(pods.Items, devicepluginPods.Items...)