From dd9de90b0ad1aa78c2a8dd7d5238d8f769ffe771 Mon Sep 17 00:00:00 2001 From: p0lyn0mial Date: Thu, 4 Jan 2018 14:52:25 +0100 Subject: [PATCH] the changes introduced in this commit plumbs in the generic scaler into kubectl. note that we don't change the behaviour of kubectl. For example it won't scale new resources. That's the end goal. The first step is to retrofit existing code to use the generic scaler. --- pkg/kubectl/cmd/util/BUILD | 1 + .../cmd/util/factory_object_mapping.go | 19 ++++++- pkg/kubectl/scale.go | 12 ++-- staging/src/k8s.io/client-go/scale/client.go | 1 - test/e2e/apps/daemon_restart.go | 6 +- test/e2e/examples.go | 4 +- test/e2e/framework/BUILD | 3 + test/e2e/framework/deployment_util.go | 7 ++- test/e2e/framework/framework.go | 25 +++++++++ test/e2e/framework/rc_util.go | 13 +++-- test/e2e/framework/util.go | 12 +++- test/e2e/network/service.go | 2 +- test/e2e/scalability/BUILD | 15 +++-- test/e2e/scalability/density.go | 5 +- test/e2e/scalability/load.go | 55 ++++++++++++++++--- .../equivalence_cache_predicates.go | 2 +- test/e2e/scheduling/priorities.go | 2 +- test/e2e/scheduling/rescheduler.go | 6 +- test/integration/framework/BUILD | 3 - test/integration/framework/util.go | 54 ------------------ test/utils/BUILD | 1 + test/utils/runners.go | 29 ++++++++++ 22 files changed, 177 insertions(+), 100 deletions(-) diff --git a/pkg/kubectl/cmd/util/BUILD b/pkg/kubectl/cmd/util/BUILD index 6541d3953e9..ed3d59475c3 100644 --- a/pkg/kubectl/cmd/util/BUILD +++ b/pkg/kubectl/cmd/util/BUILD @@ -78,6 +78,7 @@ go_library( "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/util/homedir:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/kubectl/cmd/util/factory_object_mapping.go b/pkg/kubectl/cmd/util/factory_object_mapping.go index dfd82d406c5..5c9f01f7d50 100644 --- a/pkg/kubectl/cmd/util/factory_object_mapping.go +++ b/pkg/kubectl/cmd/util/factory_object_mapping.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" @@ -286,7 +287,23 @@ func (f *ring1Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) if err != nil { return nil, err } - return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset) + + // create scales getter + // TODO(p0lyn0mial): put scalesGetter to a factory + discoClient, err := f.clientAccessFactory.DiscoveryClient() + if err != nil { + return nil, err + } + restClient, err := f.clientAccessFactory.RESTClient() + if err != nil { + return nil, err + } + mapper, _ := f.Object() + resolver := scaleclient.NewDiscoveryScaleKindResolver(discoClient) + scalesGetter := scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver) + gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) + + return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset, scalesGetter, gvk.GroupResource()) } func (f *ring1Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 1d4165f9626..511514df6b8 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -53,7 +53,10 @@ type Scaler interface { ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) } -func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, error) { +// ScalerFor gets a scaler for a given resource +// TODO(p0lyn0mial): remove kind and internalclientset +// TODO(p0lyn0mial): once we have only one scaler, there is no need to return an error anymore. +func ScalerFor(kind schema.GroupKind, c internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (Scaler, error) { switch kind { case api.Kind("ReplicationController"): return &ReplicationControllerScaler{c.Core()}, nil @@ -63,10 +66,9 @@ func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, er return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface. case apps.Kind("StatefulSet"): return &StatefulSetScaler{c.Apps()}, nil - case extensions.Kind("Deployment"), apps.Kind("Deployment"): - return &DeploymentScaler{c.Extensions()}, nil + default: + return &GenericScaler{scalesGetter, gr}, nil } - return nil, fmt.Errorf("no scaler has been implemented for %q", kind) } // ScalePrecondition describes a condition that must be true for the scale to take place @@ -533,7 +535,7 @@ func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Sca } // GenericScaler can update scales for resources in a particular namespace -// TODO(o0lyn0mial): when the work on GenericScaler is done, don't +// TODO(po0lyn0mial): when the work on GenericScaler is done, don't // export the GenericScaler. Instead use ScalerFor method for getting the Scaler // also update the UTs type GenericScaler struct { diff --git a/staging/src/k8s.io/client-go/scale/client.go b/staging/src/k8s.io/client-go/scale/client.go index 07c6098620b..a8c903d9eab 100644 --- a/staging/src/k8s.io/client-go/scale/client.go +++ b/staging/src/k8s.io/client-go/scale/client.go @@ -196,7 +196,6 @@ func (c *namespacedScaleClient) Update(resource schema.GroupResource, scale *aut Body(scaleUpdateBytes). Do() if err := result.Error(); err != nil { - panic(err) return nil, fmt.Errorf("could not update the scale for %s %s: %v", resource.String(), scale.Name, err) } diff --git a/test/e2e/apps/daemon_restart.go b/test/e2e/apps/daemon_restart.go index 85266680ee7..2319dfe5731 100644 --- a/test/e2e/apps/daemon_restart.go +++ b/test/e2e/apps/daemon_restart.go @@ -257,7 +257,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { // that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC // to the same size achieves this, because the scale operation advances the RC's sequence number // and awaits it to be observed and reported back in the RC's status. - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods, true) // Only check the keys, the pods can be different if the kubelet updated it. // TODO: Can it really? @@ -288,9 +288,9 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { restarter.kill() // This is best effort to try and create pods while the scheduler is down, // since we don't know exactly when it is restarted after the kill signal. - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, false)) restarter.waitUp() - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, true)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, true)) }) It("Kubelet should not restart containers across restart", func() { diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 7e377e203e4..6fa937c89f6 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -521,7 +521,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling rethinkdb") - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "rethinkdb-rc", 2, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "rethinkdb-rc", 2, true) checkDbInstances() By("starting admin") @@ -564,7 +564,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling hazelcast") - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "hazelcast", 2, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "hazelcast", 2, true) forEachPod("name", "hazelcast", func(pod v1.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 0ecad57686d..6a773a3957b 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -110,6 +110,7 @@ go_library( "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/api/rbac/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", @@ -132,6 +133,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", @@ -139,6 +141,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1beta1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", diff --git a/test/e2e/framework/deployment_util.go b/test/e2e/framework/deployment_util.go index 23feda770d4..d5544e1998e 100644 --- a/test/e2e/framework/deployment_util.go +++ b/test/e2e/framework/deployment_util.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" @@ -178,8 +179,10 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er return err } -func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, ns, name, size, wait, extensionsinternal.Kind("Deployment")) +//TODO(p0lyn0mial): remove internalClientset and kind. +//TODO(p0lyn0mial): update the callers. +func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments")) } func RunDeployment(config testutils.DeploymentConfig) error { diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index e628accaa28..f5341d79c00 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -28,14 +28,19 @@ import ( "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/tools/clientcmd" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -67,6 +72,8 @@ type Framework struct { AggregatorClient *aggregatorclient.Clientset ClientPool dynamic.ClientPool + ScalesGetter scaleclient.ScalesGetter + SkipNamespaceCreation bool // Whether to skip creating a namespace Namespace *v1.Namespace // Every test has at least one namespace unless creation is skipped namespacesToDelete []*v1.Namespace // Some tests have more than one. @@ -161,6 +168,24 @@ func (f *Framework) BeforeEach() { f.AggregatorClient, err = aggregatorclient.NewForConfig(config) Expect(err).NotTo(HaveOccurred()) f.ClientPool = dynamic.NewClientPool(config, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + + // create scales getter, set GroupVersion and NegotiatedSerializer to default values + // as they are required when creating a REST client. + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{} + } + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = legacyscheme.Codecs + } + restClient, err := rest.RESTClientFor(config) + Expect(err).NotTo(HaveOccurred()) + discoClient, err := discovery.NewDiscoveryClientForConfig(config) + Expect(err).NotTo(HaveOccurred()) + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured) + resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient) + f.ScalesGetter = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) + if ProviderIs("kubemark") && TestContext.KubemarkExternalKubeConfig != "" && TestContext.CloudConfig.KubemarkController == nil { externalConfig, err := clientcmd.BuildConfigFromFlags("", TestContext.KubemarkExternalKubeConfig) externalConfig.QPS = f.Options.ClientQPS diff --git a/test/e2e/framework/rc_util.go b/test/e2e/framework/rc_util.go index d0d1982b535..8bbdb6f4a9b 100644 --- a/test/e2e/framework/rc_util.go +++ b/test/e2e/framework/rc_util.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/testapi" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -84,7 +85,9 @@ func RcByNameContainer(name string, replicas int32, image string, labels map[str // ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till // none are running, otherwise it does what a synchronous scale operation would do. -func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, ns string, l map[string]string, replicas uint) error { +//TODO(p0lyn0mial): remove internalClientset. +//TODO(p0lyn0mial): update the callers. +func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error { listOpts := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l)).String()} rcs, err := clientset.CoreV1().ReplicationControllers(ns).List(listOpts) if err != nil { @@ -96,7 +99,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas) for _, labelRC := range rcs.Items { name := labelRC.Name - if err := ScaleRC(clientset, internalClientset, ns, name, replicas, false); err != nil { + if err := ScaleRC(clientset, internalClientset, scalesGetter, ns, name, replicas, false); err != nil { return err } rc, err := clientset.CoreV1().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) @@ -156,8 +159,10 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name) } -func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, ns, name, size, wait, api.Kind("ReplicationController")) +//TODO(p0lyn0mial): remove internalClientset. +//TODO(p0lyn0mial): update the callers. +func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers")) } func RunRC(config testutils.RCConfig) error { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 637a9eace0b..8384c774af0 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -74,6 +74,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/testapi" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -2682,20 +2683,25 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) { ExpectNoError(err) } -func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) { - return kubectl.ScalerFor(kind, internalClientset) +//TODO(p0lyn0mial): remove internalClientset and kind +func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (kubectl.Scaler, error) { + return kubectl.ScalerFor(kind, internalClientset, scalesGetter, gr) } +//TODO(p0lyn0mial): remove internalClientset and kind. +//TODO(p0lyn0mial): update the callers. func ScaleResource( clientset clientset.Interface, internalClientset internalclientset.Interface, + scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool, kind schema.GroupKind, + gr schema.GroupResource, ) error { By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) - scaler, err := getScalerForKind(internalClientset, kind) + scaler, err := getScalerForKind(internalClientset, kind, scalesGetter, gr) if err != nil { return err } diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 759599a9937..44712d0d4cf 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -1265,7 +1265,7 @@ var _ = SIGDescribe("Services", func() { } By("Scaling down replication controller to zero") - framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false) By("Update service to not tolerate unready services") _, err = framework.UpdateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) { diff --git a/test/e2e/scalability/BUILD b/test/e2e/scalability/BUILD index 34dff1f866a..fc6e3cee361 100644 --- a/test/e2e/scalability/BUILD +++ b/test/e2e/scalability/BUILD @@ -1,9 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", @@ -14,7 +9,9 @@ go_library( "load.go", ], importpath = "k8s.io/kubernetes/test/e2e/scalability", + visibility = ["//visibility:public"], deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/batch:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/extensions:go_default_library", @@ -26,6 +23,7 @@ go_library( "//vendor/github.com/onsi/gomega:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", @@ -38,8 +36,12 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", + "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/transport:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", @@ -57,4 +59,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index f10671e10b6..6e49d068599 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -528,7 +528,7 @@ var _ = SIGDescribe("Density", func() { podThroughput := 20 timeout := time.Duration(totalPods/podThroughput)*time.Second + 3*time.Minute // createClients is defined in load.go - clients, internalClients, err := createClients(numberOfCollections) + clients, internalClients, scalesClients, err := createClients(numberOfCollections) for i := 0; i < numberOfCollections; i++ { nsName := namespaces[i].Name secretNames := []string{} @@ -559,6 +559,7 @@ var _ = SIGDescribe("Density", func() { baseConfig := &testutils.RCConfig{ Client: clients[i], InternalClient: internalClients[i], + ScalesGetter: scalesClients[i], Image: framework.GetPauseImageName(f.ClientSet), Name: name, Namespace: nsName, @@ -590,7 +591,7 @@ var _ = SIGDescribe("Density", func() { } // Single client is running out of http2 connections in delete phase, hence we need more. - clients, internalClients, err = createClients(2) + clients, internalClients, _, err = createClients(2) dConfig := DensityTestConfig{ ClientSets: clients, diff --git a/test/e2e/scalability/load.go b/test/e2e/scalability/load.go index c696de42724..6e15dbc8803 100644 --- a/test/e2e/scalability/load.go +++ b/test/e2e/scalability/load.go @@ -28,14 +28,18 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/apis/batch" @@ -48,6 +52,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/client-go/dynamic" + "k8s.io/kubernetes/pkg/api/legacyscheme" ) const ( @@ -309,9 +315,11 @@ var _ = SIGDescribe("Load capacity", func() { } }) -func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, error) { +func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, []scaleclient.ScalesGetter, error) { clients := make([]clientset.Interface, numberOfClients) internalClients := make([]internalclientset.Interface, numberOfClients) + scalesClients := make([]scaleclient.ScalesGetter, numberOfClients) + for i := 0; i < numberOfClients; i++ { config, err := framework.LoadConfig() Expect(err).NotTo(HaveOccurred()) @@ -327,11 +335,11 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient // each client here. transportConfig, err := config.TransportConfig() if err != nil { - return nil, nil, err + return nil, nil, nil, err } tlsConfig, err := transport.TLSConfigFor(transportConfig) if err != nil { - return nil, nil, err + return nil, nil, nil, err } config.Transport = utilnet.SetTransportDefaults(&http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -349,16 +357,37 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient c, err := clientset.NewForConfig(config) if err != nil { - return nil, nil, err + return nil, nil, nil, err } clients[i] = c internalClient, err := internalclientset.NewForConfig(config) if err != nil { - return nil, nil, err + return nil, nil, nil, err } internalClients[i] = internalClient + + // create scale client, if GroupVersion or NegotiatedSerializer are not set + // assign default values - these fields are mandatory (required by RESTClientFor). + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{} + } + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = legacyscheme.Codecs + } + restClient, err := restclient.RESTClientFor(config) + if err != nil { + return nil, nil, nil, err + } + discoClient, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, nil, nil, err + } + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured) + resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient) + scalesClients[i] = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) } - return clients, internalClients, nil + return clients, internalClients, scalesClients, nil } func computePodCounts(total int) (int, int, int) { @@ -405,12 +434,13 @@ func generateConfigs( // Create a number of clients to better simulate real usecase // where not everyone is using exactly the same client. rcsPerClient := 20 - clients, internalClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient) + clients, internalClients, scalesClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient) framework.ExpectNoError(err) for i := 0; i < len(configs); i++ { configs[i].SetClient(clients[i%len(clients)]) configs[i].SetInternalClient(internalClients[i%len(internalClients)]) + configs[i].SetScalesClient(scalesClients[i%len(clients)]) } for i := 0; i < len(secretConfigs); i++ { secretConfigs[i].Client = clients[i%len(clients)] @@ -590,7 +620,16 @@ func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scaling sleepUpTo(scalingTime) newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2) framework.ExpectNoError(framework.ScaleResource( - config.GetClient(), config.GetInternalClient(), config.GetNamespace(), config.GetName(), newSize, true, config.GetKind()), + config.GetClient(), + config.GetInternalClient(), + config.GetScalesGetter(), + config.GetNamespace(), + config.GetName(), + newSize, + true, + config.GetKind(), + config.GetGroupResource(), + ), fmt.Sprintf("scaling %v %v", config.GetKind(), config.GetName())) selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.GetName()})) diff --git a/test/e2e/scheduling/equivalence_cache_predicates.go b/test/e2e/scheduling/equivalence_cache_predicates.go index 79aaf5d9d8b..3d551476399 100644 --- a/test/e2e/scheduling/equivalence_cache_predicates.go +++ b/test/e2e/scheduling/equivalence_cache_predicates.go @@ -155,7 +155,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() { By("Trying to schedule another equivalent Pod should fail due to node label has been removed.") // use scale to create another equivalent pod and wait for failure event WaitForSchedulerAfterAction(f, func() error { - err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false) + err := framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false) return err }, affinityRCName, false) // and this new pod should be rejected since node label has been updated diff --git a/test/e2e/scheduling/priorities.go b/test/e2e/scheduling/priorities.go index 1c34ea8998c..f3643b6cb13 100644 --- a/test/e2e/scheduling/priorities.go +++ b/test/e2e/scheduling/priorities.go @@ -196,7 +196,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() { By(fmt.Sprintf("Scale the RC: %s to len(nodeList.Item)-1 : %v.", rc.Name, len(nodeList.Items)-1)) - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rc.Name, uint(len(nodeList.Items)-1), true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true) testPods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{ LabelSelector: "name=scheduler-priority-avoid-pod", }) diff --git a/test/e2e/scheduling/rescheduler.go b/test/e2e/scheduling/rescheduler.go index 512e8b3c6f0..0d1107ccbe1 100644 --- a/test/e2e/scheduling/rescheduler.go +++ b/test/e2e/scheduling/rescheduler.go @@ -68,8 +68,8 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() { deployment := deployments.Items[0] replicas := uint(*(deployment.Spec.Replicas)) - err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas+1, true) - defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas, true)) + err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true) + defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true)) framework.ExpectNoError(err) }) @@ -80,7 +80,7 @@ func reserveAllCpu(f *framework.Framework, id string, millicores int) error { replicas := millicores / 100 reserveCpu(f, id, 1, 100) - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.Namespace.Name, id, uint(replicas), false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false)) for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) { pods, err := framework.GetPodsInNamespace(f.ClientSet, f.Namespace.Name, framework.ImagePullerLabels) diff --git a/test/integration/framework/BUILD b/test/integration/framework/BUILD index 07da3bc1141..27a3fb8e412 100644 --- a/test/integration/framework/BUILD +++ b/test/integration/framework/BUILD @@ -22,13 +22,10 @@ go_library( "//pkg/api/legacyscheme:go_default_library", "//pkg/api/testapi:go_default_library", "//pkg/apis/batch:go_default_library", - "//pkg/apis/core:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library", - "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/replication:go_default_library", "//pkg/generated/openapi:go_default_library", - "//pkg/kubectl:go_default_library", "//pkg/kubelet/client:go_default_library", "//pkg/master:go_default_library", "//pkg/util/env:go_default_library", diff --git a/test/integration/framework/util.go b/test/integration/framework/util.go index afb1d68961e..c9d42a99c48 100644 --- a/test/integration/framework/util.go +++ b/test/integration/framework/util.go @@ -19,22 +19,13 @@ limitations under the License. package framework import ( - "io/ioutil" "net/http/httptest" "strings" "testing" - "time" - - "github.com/golang/glog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/api/testapi" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/kubectl" ) const ( @@ -80,48 +71,3 @@ func CreateTestingNamespace(baseName string, apiserver *httptest.Server, t *test func DeleteTestingNamespace(ns *v1.Namespace, apiserver *httptest.Server, t *testing.T) { // TODO: Remove all resources from a given namespace once we implement CreateTestingNamespace. } - -// RCFromManifest reads a .json file and returns the rc in it. -func RCFromManifest(fileName string) *v1.ReplicationController { - data, err := ioutil.ReadFile(fileName) - if err != nil { - glog.Fatalf("Unexpected error reading rc manifest %v", err) - } - var controller v1.ReplicationController - if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil { - glog.Fatalf("Unexpected error reading rc manifest %v", err) - } - return &controller -} - -// StopRC stops the rc via kubectl's stop library -func StopRC(rc *v1.ReplicationController, clientset internalclientset.Interface) error { - reaper, err := kubectl.ReaperFor(api.Kind("ReplicationController"), clientset) - if err != nil || reaper == nil { - return err - } - err = reaper.Stop(rc.Namespace, rc.Name, 0, nil) - if err != nil { - return err - } - return nil -} - -// ScaleRC scales the given rc to the given replicas. -func ScaleRC(name, ns string, replicas int32, clientset internalclientset.Interface) (*api.ReplicationController, error) { - scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), clientset) - if err != nil { - return nil, err - } - retry := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout} - waitForReplicas := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout} - err = scaler.Scale(ns, name, uint(replicas), nil, retry, waitForReplicas) - if err != nil { - return nil, err - } - scaled, err := clientset.Core().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return scaled, nil -} diff --git a/test/utils/BUILD b/test/utils/BUILD index a2e6045933f..da7eeadab95 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -44,6 +44,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], diff --git a/test/utils/runners.go b/test/utils/runners.go index 2eaf28e48e2..1d71a3eeb62 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/util/workqueue" batchinternal "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" @@ -105,16 +106,20 @@ type RunObjectConfig interface { GetKind() schema.GroupKind GetClient() clientset.Interface GetInternalClient() internalclientset.Interface + GetScalesGetter() scaleclient.ScalesGetter SetClient(clientset.Interface) SetInternalClient(internalclientset.Interface) + SetScalesClient(scaleclient.ScalesGetter) GetReplicas() int GetLabelValue(string) (string, bool) + GetGroupResource() schema.GroupResource } type RCConfig struct { Affinity *v1.Affinity Client clientset.Interface InternalClient internalclientset.Interface + ScalesGetter scaleclient.ScalesGetter Image string Command []string Name string @@ -277,6 +282,10 @@ func (config *DeploymentConfig) GetKind() schema.GroupKind { return extensionsinternal.Kind("Deployment") } +func (config *DeploymentConfig) GetGroupResource() schema.GroupResource { + return extensionsinternal.Resource("deployments") +} + func (config *DeploymentConfig) create() error { deployment := &extensions.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -344,6 +353,10 @@ func (config *ReplicaSetConfig) GetKind() schema.GroupKind { return extensionsinternal.Kind("ReplicaSet") } +func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource { + return extensionsinternal.Resource("replicasets") +} + func (config *ReplicaSetConfig) create() error { rs := &extensions.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ @@ -411,6 +424,10 @@ func (config *JobConfig) GetKind() schema.GroupKind { return batchinternal.Kind("Job") } +func (config *JobConfig) GetGroupResource() schema.GroupResource { + return batchinternal.Resource("jobs") +} + func (config *JobConfig) create() error { job := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -482,6 +499,10 @@ func (config *RCConfig) GetKind() schema.GroupKind { return api.Kind("ReplicationController") } +func (config *RCConfig) GetGroupResource() schema.GroupResource { + return api.Resource("replicationcontrollers") +} + func (config *RCConfig) GetClient() clientset.Interface { return config.Client } @@ -490,6 +511,10 @@ func (config *RCConfig) GetInternalClient() internalclientset.Interface { return config.InternalClient } +func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter { + return config.ScalesGetter +} + func (config *RCConfig) SetClient(c clientset.Interface) { config.Client = c } @@ -498,6 +523,10 @@ func (config *RCConfig) SetInternalClient(c internalclientset.Interface) { config.InternalClient = c } +func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) { + config.ScalesGetter = getter +} + func (config *RCConfig) GetReplicas() int { return config.Replicas }