Fix HPA E2E CRD test

This commit is contained in:
Piotr Nosek 2022-09-08 22:26:10 +00:00
parent 127f33f63d
commit 96ff1b1bcb
6 changed files with 237 additions and 32 deletions

View File

@ -246,7 +246,6 @@ func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
// all metrics computed. // all metrics computed.
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale, func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) { metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
if scale.Status.Selector == "" { if scale.Status.Selector == "" {
errMsg := "selector is required" errMsg := "selector is required"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg) a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)

View File

@ -95,6 +95,8 @@ var _ = SIGDescribe("[Feature:HPA] Horizontal pod autoscaling (scale resource: C
minPods: 1, minPods: 1,
maxPods: 2, maxPods: 2,
firstScale: 2, firstScale: 2,
resourceType: cpuResource,
metricTargetType: utilizationMetricType,
} }
st.run("rc-light", e2eautoscaling.KindRC, f) st.run("rc-light", e2eautoscaling.KindRC, f)
}) })
@ -107,6 +109,8 @@ var _ = SIGDescribe("[Feature:HPA] Horizontal pod autoscaling (scale resource: C
minPods: 1, minPods: 1,
maxPods: 2, maxPods: 2,
firstScale: 1, firstScale: 1,
resourceType: cpuResource,
metricTargetType: utilizationMetricType,
} }
st.run("rc-light", e2eautoscaling.KindRC, f) st.run("rc-light", e2eautoscaling.KindRC, f)
}) })
@ -126,7 +130,7 @@ var _ = SIGDescribe("[Feature:HPA] Horizontal pod autoscaling (scale resource: C
ginkgo.Describe("CustomResourceDefinition", func() { ginkgo.Describe("CustomResourceDefinition", func() {
ginkgo.It("Should scale with a CRD targetRef", func() { ginkgo.It("Should scale with a CRD targetRef", func() {
st := &HPAScaleTest{ scaleTest := &HPAScaleTest{
initPods: 1, initPods: 1,
initCPUTotal: 150, initCPUTotal: 150,
perPodCPURequest: 200, perPodCPURequest: 200,
@ -134,9 +138,10 @@ var _ = SIGDescribe("[Feature:HPA] Horizontal pod autoscaling (scale resource: C
minPods: 1, minPods: 1,
maxPods: 2, maxPods: 2,
firstScale: 2, firstScale: 2,
targetRef: e2eautoscaling.CustomCRDTargetRef(), resourceType: cpuResource,
metricTargetType: utilizationMetricType,
} }
st.run("crd-light", e2eautoscaling.KindCRD, f) scaleTest.run("foo-crd", e2eautoscaling.KindCRD, f)
}) })
}) })
}) })
@ -179,7 +184,6 @@ type HPAScaleTest struct {
cpuBurst int cpuBurst int
memBurst int memBurst int
secondScale int32 secondScale int32
targetRef autoscalingv2.CrossVersionObjectReference
resourceType v1.ResourceName resourceType v1.ResourceName
metricTargetType autoscalingv2.MetricTargetType metricTargetType autoscalingv2.MetricTargetType
} }

View File

@ -26,22 +26,29 @@ import (
autoscalingv1 "k8s.io/api/autoscaling/v1" autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2" autoscalingv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
crdclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2erc "k8s.io/kubernetes/test/e2e/framework/rc" e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service" e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
utilpointer "k8s.io/utils/pointer"
"github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2"
scaleclient "k8s.io/client-go/scale"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
) )
@ -59,11 +66,17 @@ const (
rcIsNil = "ERROR: replicationController = nil" rcIsNil = "ERROR: replicationController = nil"
deploymentIsNil = "ERROR: deployment = nil" deploymentIsNil = "ERROR: deployment = nil"
rsIsNil = "ERROR: replicaset = nil" rsIsNil = "ERROR: replicaset = nil"
crdIsNil = "ERROR: CRD = nil"
invalidKind = "ERROR: invalid workload kind for resource consumer" invalidKind = "ERROR: invalid workload kind for resource consumer"
customMetricName = "QPS" customMetricName = "QPS"
serviceInitializationTimeout = 2 * time.Minute serviceInitializationTimeout = 2 * time.Minute
serviceInitializationInterval = 15 * time.Second serviceInitializationInterval = 15 * time.Second
megabytes = 1024 * 1024 megabytes = 1024 * 1024
crdVersion = "v1"
crdKind = "TestCRD"
crdGroup = "autoscalinge2e.example.com"
crdName = "testcrd"
crdNamePlural = "testcrds"
) )
var ( var (
@ -78,7 +91,7 @@ var (
// KindReplicaSet is the GVK for ReplicaSet // KindReplicaSet is the GVK for ReplicaSet
KindReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "ReplicaSet"} KindReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "ReplicaSet"}
// KindCRD is the GVK for CRD for test purposes // KindCRD is the GVK for CRD for test purposes
KindCRD = schema.GroupVersionKind{Group: "test", Version: "v1", Kind: "TestCustomCRD"} KindCRD = schema.GroupVersionKind{Group: crdGroup, Version: crdVersion, Kind: crdKind}
) )
// ScalingDirection identifies the scale direction for HPA Behavior. // ScalingDirection identifies the scale direction for HPA Behavior.
@ -104,6 +117,9 @@ type ResourceConsumer struct {
kind schema.GroupVersionKind kind schema.GroupVersionKind
nsName string nsName string
clientSet clientset.Interface clientSet clientset.Interface
apiExtensionClient crdclientset.Interface
dynamicClient dynamic.Interface
resourceClient dynamic.ResourceInterface
scaleClient scaleclient.ScalesGetter scaleClient scaleclient.ScalesGetter
cpu chan int cpu chan int
mem chan int mem chan int
@ -177,7 +193,15 @@ func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, repl
additionalContainers = append(additionalContainers, sidecarContainer) additionalContainers = append(additionalContainers, sidecarContainer)
} }
runServiceAndWorkloadForResourceConsumer(clientset, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations, additionalContainers) config, err := framework.LoadConfig()
framework.ExpectNoError(err)
apiExtensionClient, err := crdclientset.NewForConfig(config)
framework.ExpectNoError(err)
dynamicClient, err := dynamic.NewForConfig(config)
framework.ExpectNoError(err)
resourceClient := dynamicClient.Resource(schema.GroupVersionResource{Group: crdGroup, Version: crdVersion, Resource: crdNamePlural}).Namespace(nsName)
runServiceAndWorkloadForResourceConsumer(clientset, resourceClient, apiExtensionClient, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations, additionalContainers)
controllerName := name + "-ctrl" controllerName := name + "-ctrl"
// If sidecar is enabled and busy, run service and consumer for sidecar // If sidecar is enabled and busy, run service and consumer for sidecar
if sidecarStatus == Enable && sidecarType == Busy { if sidecarStatus == Enable && sidecarType == Busy {
@ -191,7 +215,10 @@ func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, repl
kind: kind, kind: kind,
nsName: nsName, nsName: nsName,
clientSet: clientset, clientSet: clientset,
apiExtensionClient: apiExtensionClient,
scaleClient: scaleClient, scaleClient: scaleClient,
resourceClient: resourceClient,
dynamicClient: dynamicClient,
cpu: make(chan int), cpu: make(chan int),
mem: make(chan int), mem: make(chan int),
customMetric: make(chan int), customMetric: make(chan int),
@ -416,6 +443,23 @@ func (rc *ResourceConsumer) GetReplicas() int {
framework.Failf(rsIsNil) framework.Failf(rsIsNil)
} }
return int(rs.Status.ReadyReplicas) return int(rs.Status.ReadyReplicas)
case KindCRD:
deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(context.TODO(), rc.name, metav1.GetOptions{})
framework.ExpectNoError(err)
if deployment == nil {
framework.Failf(deploymentIsNil)
}
deploymentReplicas := int64(deployment.Status.ReadyReplicas)
scale, err := rc.scaleClient.Scales(rc.nsName).Get(context.TODO(), schema.GroupResource{Group: crdGroup, Resource: crdNamePlural}, rc.name, metav1.GetOptions{})
framework.ExpectNoError(err)
crdInstance, err := rc.resourceClient.Get(context.TODO(), rc.name, metav1.GetOptions{})
framework.ExpectNoError(err)
// Update custom resource's status.replicas with child Deployment's current number of ready replicas.
framework.ExpectNoError(unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas"))
_, err = rc.resourceClient.Update(context.TODO(), crdInstance, metav1.UpdateOptions{})
framework.ExpectNoError(err)
return int(scale.Spec.Replicas)
default: default:
framework.Failf(invalidKind) framework.Failf(invalidKind)
} }
@ -493,7 +537,14 @@ func (rc *ResourceConsumer) CleanUp() {
// Wait some time to ensure all child goroutines are finished. // Wait some time to ensure all child goroutines are finished.
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
kind := rc.kind.GroupKind() kind := rc.kind.GroupKind()
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(rc.clientSet, kind, rc.nsName, rc.name)) if kind.Kind == crdKind {
gvr := schema.GroupVersionResource{Group: crdGroup, Version: crdVersion, Resource: crdNamePlural}
framework.ExpectNoError(e2eresource.DeleteCustomResourceAndWaitForGC(rc.clientSet, rc.dynamicClient, rc.scaleClient, gvr, rc.nsName, rc.name))
} else {
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(rc.clientSet, kind, rc.nsName, rc.name))
}
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(context.TODO(), rc.name, metav1.DeleteOptions{})) framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(context.TODO(), rc.name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(rc.clientSet, schema.GroupKind{Kind: "ReplicationController"}, rc.nsName, rc.controllerName)) framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(rc.clientSet, schema.GroupKind{Kind: "ReplicationController"}, rc.nsName, rc.controllerName))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(context.TODO(), rc.name+"-ctrl", metav1.DeleteOptions{})) framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(context.TODO(), rc.name+"-ctrl", metav1.DeleteOptions{}))
@ -554,7 +605,7 @@ func runServiceAndSidecarForResourceConsumer(c clientset.Interface, ns, name str
c, ns, controllerName, 1, startServiceInterval, startServiceTimeout)) c, ns, controllerName, 1, startServiceInterval, startServiceTimeout))
} }
func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string, additionalContainers []v1.Container) { func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, resourceClient dynamic.ResourceInterface, apiExtensionClient crdclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string, additionalContainers []v1.Container) {
ginkgo.By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas)) ginkgo.By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
_, err := createService(c, name, ns, serviceAnnotations, map[string]string{"name": name}, port, targetPort) _, err := createService(c, name, ns, serviceAnnotations, map[string]string{"name": name}, port, targetPort)
framework.ExpectNoError(err) framework.ExpectNoError(err)
@ -574,23 +625,42 @@ func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, ns, name st
AdditionalContainers: additionalContainers, AdditionalContainers: additionalContainers,
} }
dpConfig := testutils.DeploymentConfig{
RCConfig: rcConfig,
}
dpConfig.NodeDumpFunc = framework.DumpNodeDebugInfo
dpConfig.ContainerDumpFunc = e2ekubectl.LogFailedContainers
switch kind { switch kind {
case KindRC: case KindRC:
framework.ExpectNoError(e2erc.RunRC(rcConfig)) framework.ExpectNoError(e2erc.RunRC(rcConfig))
case KindDeployment: case KindDeployment:
dpConfig := testutils.DeploymentConfig{ ginkgo.By(fmt.Sprintf("Creating deployment %s in namespace %s", dpConfig.Name, dpConfig.Namespace))
RCConfig: rcConfig,
}
ginkgo.By(fmt.Sprintf("creating deployment %s in namespace %s", dpConfig.Name, dpConfig.Namespace))
dpConfig.NodeDumpFunc = framework.DumpNodeDebugInfo
dpConfig.ContainerDumpFunc = e2ekubectl.LogFailedContainers
framework.ExpectNoError(testutils.RunDeployment(dpConfig)) framework.ExpectNoError(testutils.RunDeployment(dpConfig))
case KindReplicaSet: case KindReplicaSet:
rsConfig := testutils.ReplicaSetConfig{ rsConfig := testutils.ReplicaSetConfig{
RCConfig: rcConfig, RCConfig: rcConfig,
} }
ginkgo.By(fmt.Sprintf("creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace)) ginkgo.By(fmt.Sprintf("Creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace))
framework.ExpectNoError(runReplicaSet(rsConfig)) framework.ExpectNoError(runReplicaSet(rsConfig))
case KindCRD:
crd := CreateCustomResourceDefinition(apiExtensionClient)
crdInstance, err := CreateCustomSubresourceInstance(ns, name, resourceClient, crd)
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Creating deployment %s backing CRD in namespace %s", dpConfig.Name, dpConfig.Namespace))
framework.ExpectNoError(testutils.RunDeployment(dpConfig))
deployment, err := c.AppsV1().Deployments(dpConfig.Namespace).Get(context.TODO(), dpConfig.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
deployment.SetOwnerReferences([]metav1.OwnerReference{{
APIVersion: kind.GroupVersion().String(),
Kind: crdKind,
Name: name,
UID: crdInstance.GetUID(),
}})
_, err = c.AppsV1().Deployments(dpConfig.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
framework.ExpectNoError(err)
default: default:
framework.Failf(invalidKind) framework.Failf(invalidKind)
} }
@ -663,14 +733,6 @@ func DeleteHorizontalPodAutoscaler(rc *ResourceConsumer, autoscalerName string)
rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Delete(context.TODO(), autoscalerName, metav1.DeleteOptions{}) rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Delete(context.TODO(), autoscalerName, metav1.DeleteOptions{})
} }
func CustomCRDTargetRef() autoscalingv2.CrossVersionObjectReference {
return autoscalingv2.CrossVersionObjectReference{
Kind: "TestCustomCRD",
Name: "test-custom-crd",
APIVersion: "test/v1",
}
}
// runReplicaSet launches (and verifies correctness) of a replicaset. // runReplicaSet launches (and verifies correctness) of a replicaset.
func runReplicaSet(config testutils.ReplicaSetConfig) error { func runReplicaSet(config testutils.ReplicaSetConfig) error {
ginkgo.By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace)) ginkgo.By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace))
@ -846,3 +908,94 @@ const (
Busy SidecarWorkloadType = "Busy" Busy SidecarWorkloadType = "Busy"
Idle SidecarWorkloadType = "Idle" Idle SidecarWorkloadType = "Idle"
) )
func CreateCustomResourceDefinition(c crdclientset.Interface) *apiextensionsv1.CustomResourceDefinition {
crdSchema := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: crdNamePlural + "." + crdGroup},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: crdGroup,
Scope: apiextensionsv1.ResourceScope("Namespaced"),
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: crdNamePlural,
Singular: crdName,
Kind: crdKind,
ListKind: "TestCRDList",
},
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{
Name: crdVersion,
Served: true,
Storage: true,
Schema: fixtures.AllowAllSchema(),
Subresources: &apiextensionsv1.CustomResourceSubresources{
Scale: &apiextensionsv1.CustomResourceSubresourceScale{
SpecReplicasPath: ".spec.replicas",
StatusReplicasPath: ".status.replicas",
LabelSelectorPath: utilpointer.String(".status.selector"),
},
},
}},
},
Status: apiextensionsv1.CustomResourceDefinitionStatus{},
}
// Create Custom Resource Definition if it's not present.
crd, err := c.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdSchema.Name, metav1.GetOptions{})
if err != nil {
crd, err = c.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), crdSchema, metav1.CreateOptions{})
framework.ExpectNoError(err)
// Wait until just created CRD appears in discovery.
err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
return ExistsInDiscovery(crd, c, "v1")
})
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Successfully created Custom Resource Definition: %v", crd))
}
return crd
}
func ExistsInDiscovery(crd *apiextensionsv1.CustomResourceDefinition, apiExtensionsClient crdclientset.Interface, version string) (bool, error) {
groupResource, err := apiExtensionsClient.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + version)
if err != nil {
return false, err
}
for _, g := range groupResource.APIResources {
if g.Name == crd.Spec.Names.Plural {
return true, nil
}
}
return false, nil
}
func CreateCustomSubresourceInstance(namespace, name string, client dynamic.ResourceInterface, definition *apiextensionsv1.CustomResourceDefinition) (*unstructured.Unstructured, error) {
instance := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": crdGroup + "/" + crdVersion,
"kind": crdKind,
"metadata": map[string]interface{}{
"namespace": namespace,
"name": name,
},
"spec": map[string]interface{}{
"num": int64(1),
"replicas": int64(1),
},
"status": map[string]interface{}{
"replicas": int64(1),
"selector": "name=" + name,
},
},
}
instance, err := client.Create(context.TODO(), instance, metav1.CreateOptions{})
if err != nil {
framework.Logf("%#v", instance)
return nil, err
}
createdObjectMeta, err := meta.Accessor(instance)
if err != nil {
return nil, fmt.Errorf("Error while creating object meta: %v", err)
}
if len(createdObjectMeta.GetUID()) == 0 {
return nil, fmt.Errorf("Missing UUID: %v", instance)
}
ginkgo.By(fmt.Sprintf("Successfully created instance of CRD of kind %v: %v", definition.Kind, instance))
return instance, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package resource package resource
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@ -26,8 +27,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale" scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -73,6 +76,39 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
} }
return err return err
} }
deleteObject := func() error {
background := metav1.DeletePropagationBackground
return testutils.DeleteResource(c, kind, ns, name, metav1.DeleteOptions{PropagationPolicy: &background})
}
return deleteObjectAndWaitForGC(c, rtObject, deleteObject, ns, name, kind.String())
}
// DeleteCustomResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
// Enables to provide a custom resourece client, e.g. to fetch a CRD object.
func DeleteCustomResourceAndWaitForGC(c clientset.Interface, dynamicClient dynamic.Interface, scaleClient scaleclient.ScalesGetter, gvr schema.GroupVersionResource, ns, name string) error {
ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", gvr, name, ns))
resourceClient := dynamicClient.Resource(gvr).Namespace(ns)
_, err := resourceClient.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
framework.Logf("%v %s not found: %v", gvr, name, err)
return nil
}
return err
}
scaleObj, err := scaleClient.Scales(ns).Get(context.TODO(), gvr.GroupResource(), name, metav1.GetOptions{})
if err != nil {
framework.Logf("error while trying to get scale subresource of kind %v with name %v: %v", gvr, name, err)
return nil
}
deleteObject := func() error {
background := metav1.DeletePropagationBackground
return resourceClient.Delete(context.TODO(), name, metav1.DeleteOptions{PropagationPolicy: &background})
}
return deleteObjectAndWaitForGC(c, scaleObj, deleteObject, ns, name, gvr.String())
}
func deleteObjectAndWaitForGC(c clientset.Interface, rtObject runtime.Object, deleteObject func() error, ns, name, description string) error {
selector, err := GetSelectorFromRuntimeObject(rtObject) selector, err := GetSelectorFromRuntimeObject(rtObject)
if err != nil { if err != nil {
return err return err
@ -88,14 +124,18 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
} }
defer ps.Stop() defer ps.Stop()
falseVar := false
deleteOption := metav1.DeleteOptions{OrphanDependents: &falseVar}
startTime := time.Now() startTime := time.Now()
if err := testutils.DeleteResourceWithRetries(c, kind, ns, name, deleteOption); err != nil { if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
err := deleteObject()
if err == nil || apierrors.IsNotFound(err) {
return true, nil
}
return false, fmt.Errorf("failed to delete object with non-retriable error: %v", err)
}); err != nil {
return err return err
} }
deleteTime := time.Since(startTime) deleteTime := time.Since(startTime)
framework.Logf("Deleting %v %s took: %v", kind, name, deleteTime) framework.Logf("Deleting %v %s took: %v", description, name, deleteTime)
var interval, timeout time.Duration var interval, timeout time.Duration
switch { switch {
@ -119,7 +159,7 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err) return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
} }
terminatePodTime := time.Since(startTime) - deleteTime terminatePodTime := time.Since(startTime) - deleteTime
framework.Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime) framework.Logf("Terminating %v %s pods took: %v", description, name, terminatePodTime)
// In gce, at any point, small percentage of nodes can disappear for // In gce, at any point, small percentage of nodes can disappear for
// ~10 minutes due to hostError. 20 minutes should be long enough to // ~10 minutes due to hostError. 20 minutes should be long enough to

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
@ -79,6 +80,12 @@ func GetSelectorFromRuntimeObject(obj runtime.Object) (labels.Selector, error) {
return metav1.LabelSelectorAsSelector(typed.Spec.Selector) return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
case *batchv1.Job: case *batchv1.Job:
return metav1.LabelSelectorAsSelector(typed.Spec.Selector) return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
case *autoscalingv1.Scale:
selector, err := metav1.ParseToLabelSelector(typed.Status.Selector)
if err != nil {
return nil, fmt.Errorf("Parsing selector for: %v encountered an error: %v", obj, err)
}
return metav1.LabelSelectorAsSelector(selector)
default: default:
return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj) return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj)
} }
@ -124,6 +131,8 @@ func GetReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
return *typed.Spec.Parallelism, nil return *typed.Spec.Parallelism, nil
} }
return 0, nil return 0, nil
case *autoscalingv1.Scale:
return typed.Status.Replicas, nil
default: default:
return -1, fmt.Errorf("Unsupported kind when getting number of replicas: %v", obj) return -1, fmt.Errorf("Unsupported kind when getting number of replicas: %v", obj)
} }

View File

@ -32,7 +32,7 @@ import (
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
) )
func deleteResource(c clientset.Interface, kind schema.GroupKind, namespace, name string, options metav1.DeleteOptions) error { func DeleteResource(c clientset.Interface, kind schema.GroupKind, namespace, name string, options metav1.DeleteOptions) error {
switch kind { switch kind {
case api.Kind("Pod"): case api.Kind("Pod"):
return c.CoreV1().Pods(namespace).Delete(context.TODO(), name, options) return c.CoreV1().Pods(namespace).Delete(context.TODO(), name, options)
@ -59,7 +59,7 @@ func deleteResource(c clientset.Interface, kind schema.GroupKind, namespace, nam
func DeleteResourceWithRetries(c clientset.Interface, kind schema.GroupKind, namespace, name string, options metav1.DeleteOptions) error { func DeleteResourceWithRetries(c clientset.Interface, kind schema.GroupKind, namespace, name string, options metav1.DeleteOptions) error {
deleteFunc := func() (bool, error) { deleteFunc := func() (bool, error) {
err := deleteResource(c, kind, namespace, name, options) err := DeleteResource(c, kind, namespace, name, options)
if err == nil || apierrors.IsNotFound(err) { if err == nil || apierrors.IsNotFound(err) {
return true, nil return true, nil
} }