diff --git a/pkg/kubectl/cmd/util/BUILD b/pkg/kubectl/cmd/util/BUILD index 6541d3953e9..ed3d59475c3 100644 --- a/pkg/kubectl/cmd/util/BUILD +++ b/pkg/kubectl/cmd/util/BUILD @@ -78,6 +78,7 @@ go_library( "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/util/homedir:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/kubectl/cmd/util/factory_object_mapping.go b/pkg/kubectl/cmd/util/factory_object_mapping.go index dfd82d406c5..5c9f01f7d50 100644 --- a/pkg/kubectl/cmd/util/factory_object_mapping.go +++ b/pkg/kubectl/cmd/util/factory_object_mapping.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" @@ -286,7 +287,23 @@ func (f *ring1Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) if err != nil { return nil, err } - return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset) + + // create scales getter + // TODO(p0lyn0mial): put scalesGetter to a factory + discoClient, err := f.clientAccessFactory.DiscoveryClient() + if err != nil { + return nil, err + } + restClient, err := f.clientAccessFactory.RESTClient() + if err != nil { + return nil, err + } + mapper, _ := f.Object() + resolver := scaleclient.NewDiscoveryScaleKindResolver(discoClient) + scalesGetter := scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver) + gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) + + return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset, scalesGetter, gvk.GroupResource()) } func (f *ring1Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 1d4165f9626..511514df6b8 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -53,7 +53,10 @@ type Scaler interface { ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) } -func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, error) { +// ScalerFor gets a scaler for a given resource +// TODO(p0lyn0mial): remove kind and internalclientset +// TODO(p0lyn0mial): once we have only one scaler, there is no need to return an error anymore. +func ScalerFor(kind schema.GroupKind, c internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (Scaler, error) { switch kind { case api.Kind("ReplicationController"): return &ReplicationControllerScaler{c.Core()}, nil @@ -63,10 +66,9 @@ func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, er return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface. case apps.Kind("StatefulSet"): return &StatefulSetScaler{c.Apps()}, nil - case extensions.Kind("Deployment"), apps.Kind("Deployment"): - return &DeploymentScaler{c.Extensions()}, nil + default: + return &GenericScaler{scalesGetter, gr}, nil } - return nil, fmt.Errorf("no scaler has been implemented for %q", kind) } // ScalePrecondition describes a condition that must be true for the scale to take place @@ -533,7 +535,7 @@ func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Sca } // GenericScaler can update scales for resources in a particular namespace -// TODO(o0lyn0mial): when the work on GenericScaler is done, don't +// TODO(po0lyn0mial): when the work on GenericScaler is done, don't // export the GenericScaler. Instead use ScalerFor method for getting the Scaler // also update the UTs type GenericScaler struct { diff --git a/staging/src/k8s.io/client-go/scale/client.go b/staging/src/k8s.io/client-go/scale/client.go index 07c6098620b..a8c903d9eab 100644 --- a/staging/src/k8s.io/client-go/scale/client.go +++ b/staging/src/k8s.io/client-go/scale/client.go @@ -196,7 +196,6 @@ func (c *namespacedScaleClient) Update(resource schema.GroupResource, scale *aut Body(scaleUpdateBytes). Do() if err := result.Error(); err != nil { - panic(err) return nil, fmt.Errorf("could not update the scale for %s %s: %v", resource.String(), scale.Name, err) } diff --git a/test/e2e/apps/daemon_restart.go b/test/e2e/apps/daemon_restart.go index 85266680ee7..2319dfe5731 100644 --- a/test/e2e/apps/daemon_restart.go +++ b/test/e2e/apps/daemon_restart.go @@ -257,7 +257,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { // that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC // to the same size achieves this, because the scale operation advances the RC's sequence number // and awaits it to be observed and reported back in the RC's status. - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods, true) // Only check the keys, the pods can be different if the kubelet updated it. // TODO: Can it really? @@ -288,9 +288,9 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { restarter.kill() // This is best effort to try and create pods while the scheduler is down, // since we don't know exactly when it is restarted after the kill signal. - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, false)) restarter.waitUp() - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, true)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, true)) }) It("Kubelet should not restart containers across restart", func() { diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 7e377e203e4..6fa937c89f6 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -521,7 +521,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling rethinkdb") - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "rethinkdb-rc", 2, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "rethinkdb-rc", 2, true) checkDbInstances() By("starting admin") @@ -564,7 +564,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling hazelcast") - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "hazelcast", 2, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "hazelcast", 2, true) forEachPod("name", "hazelcast", func(pod v1.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 0ecad57686d..6a773a3957b 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -110,6 +110,7 @@ go_library( "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/api/rbac/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", @@ -132,6 +133,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", @@ -139,6 +141,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1beta1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", diff --git a/test/e2e/framework/deployment_util.go b/test/e2e/framework/deployment_util.go index 23feda770d4..d5544e1998e 100644 --- a/test/e2e/framework/deployment_util.go +++ b/test/e2e/framework/deployment_util.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" @@ -178,8 +179,10 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er return err } -func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, ns, name, size, wait, extensionsinternal.Kind("Deployment")) +//TODO(p0lyn0mial): remove internalClientset and kind. +//TODO(p0lyn0mial): update the callers. +func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments")) } func RunDeployment(config testutils.DeploymentConfig) error { diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index e628accaa28..f5341d79c00 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -28,14 +28,19 @@ import ( "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/tools/clientcmd" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -67,6 +72,8 @@ type Framework struct { AggregatorClient *aggregatorclient.Clientset ClientPool dynamic.ClientPool + ScalesGetter scaleclient.ScalesGetter + SkipNamespaceCreation bool // Whether to skip creating a namespace Namespace *v1.Namespace // Every test has at least one namespace unless creation is skipped namespacesToDelete []*v1.Namespace // Some tests have more than one. @@ -161,6 +168,24 @@ func (f *Framework) BeforeEach() { f.AggregatorClient, err = aggregatorclient.NewForConfig(config) Expect(err).NotTo(HaveOccurred()) f.ClientPool = dynamic.NewClientPool(config, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + + // create scales getter, set GroupVersion and NegotiatedSerializer to default values + // as they are required when creating a REST client. + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{} + } + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = legacyscheme.Codecs + } + restClient, err := rest.RESTClientFor(config) + Expect(err).NotTo(HaveOccurred()) + discoClient, err := discovery.NewDiscoveryClientForConfig(config) + Expect(err).NotTo(HaveOccurred()) + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured) + resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient) + f.ScalesGetter = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) + if ProviderIs("kubemark") && TestContext.KubemarkExternalKubeConfig != "" && TestContext.CloudConfig.KubemarkController == nil { externalConfig, err := clientcmd.BuildConfigFromFlags("", TestContext.KubemarkExternalKubeConfig) externalConfig.QPS = f.Options.ClientQPS diff --git a/test/e2e/framework/rc_util.go b/test/e2e/framework/rc_util.go index d0d1982b535..8bbdb6f4a9b 100644 --- a/test/e2e/framework/rc_util.go +++ b/test/e2e/framework/rc_util.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/testapi" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -84,7 +85,9 @@ func RcByNameContainer(name string, replicas int32, image string, labels map[str // ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till // none are running, otherwise it does what a synchronous scale operation would do. -func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, ns string, l map[string]string, replicas uint) error { +//TODO(p0lyn0mial): remove internalClientset. +//TODO(p0lyn0mial): update the callers. +func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error { listOpts := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l)).String()} rcs, err := clientset.CoreV1().ReplicationControllers(ns).List(listOpts) if err != nil { @@ -96,7 +99,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas) for _, labelRC := range rcs.Items { name := labelRC.Name - if err := ScaleRC(clientset, internalClientset, ns, name, replicas, false); err != nil { + if err := ScaleRC(clientset, internalClientset, scalesGetter, ns, name, replicas, false); err != nil { return err } rc, err := clientset.CoreV1().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) @@ -156,8 +159,10 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name) } -func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, ns, name, size, wait, api.Kind("ReplicationController")) +//TODO(p0lyn0mial): remove internalClientset. +//TODO(p0lyn0mial): update the callers. +func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers")) } func RunRC(config testutils.RCConfig) error { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 637a9eace0b..8384c774af0 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -74,6 +74,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/testapi" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -2682,20 +2683,25 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) { ExpectNoError(err) } -func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) { - return kubectl.ScalerFor(kind, internalClientset) +//TODO(p0lyn0mial): remove internalClientset and kind +func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (kubectl.Scaler, error) { + return kubectl.ScalerFor(kind, internalClientset, scalesGetter, gr) } +//TODO(p0lyn0mial): remove internalClientset and kind. +//TODO(p0lyn0mial): update the callers. func ScaleResource( clientset clientset.Interface, internalClientset internalclientset.Interface, + scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool, kind schema.GroupKind, + gr schema.GroupResource, ) error { By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) - scaler, err := getScalerForKind(internalClientset, kind) + scaler, err := getScalerForKind(internalClientset, kind, scalesGetter, gr) if err != nil { return err } diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 759599a9937..44712d0d4cf 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -1265,7 +1265,7 @@ var _ = SIGDescribe("Services", func() { } By("Scaling down replication controller to zero") - framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false) By("Update service to not tolerate unready services") _, err = framework.UpdateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) { diff --git a/test/e2e/scalability/BUILD b/test/e2e/scalability/BUILD index 34dff1f866a..fc6e3cee361 100644 --- a/test/e2e/scalability/BUILD +++ b/test/e2e/scalability/BUILD @@ -1,9 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", @@ -14,7 +9,9 @@ go_library( "load.go", ], importpath = "k8s.io/kubernetes/test/e2e/scalability", + visibility = ["//visibility:public"], deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/batch:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/extensions:go_default_library", @@ -26,6 +23,7 @@ go_library( "//vendor/github.com/onsi/gomega:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", @@ -38,8 +36,12 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", + "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/transport:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", @@ -57,4 +59,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index f10671e10b6..6e49d068599 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -528,7 +528,7 @@ var _ = SIGDescribe("Density", func() { podThroughput := 20 timeout := time.Duration(totalPods/podThroughput)*time.Second + 3*time.Minute // createClients is defined in load.go - clients, internalClients, err := createClients(numberOfCollections) + clients, internalClients, scalesClients, err := createClients(numberOfCollections) for i := 0; i < numberOfCollections; i++ { nsName := namespaces[i].Name secretNames := []string{} @@ -559,6 +559,7 @@ var _ = SIGDescribe("Density", func() { baseConfig := &testutils.RCConfig{ Client: clients[i], InternalClient: internalClients[i], + ScalesGetter: scalesClients[i], Image: framework.GetPauseImageName(f.ClientSet), Name: name, Namespace: nsName, @@ -590,7 +591,7 @@ var _ = SIGDescribe("Density", func() { } // Single client is running out of http2 connections in delete phase, hence we need more. - clients, internalClients, err = createClients(2) + clients, internalClients, _, err = createClients(2) dConfig := DensityTestConfig{ ClientSets: clients, diff --git a/test/e2e/scalability/load.go b/test/e2e/scalability/load.go index c696de42724..6e15dbc8803 100644 --- a/test/e2e/scalability/load.go +++ b/test/e2e/scalability/load.go @@ -28,14 +28,18 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/apis/batch" @@ -48,6 +52,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/client-go/dynamic" + "k8s.io/kubernetes/pkg/api/legacyscheme" ) const ( @@ -309,9 +315,11 @@ var _ = SIGDescribe("Load capacity", func() { } }) -func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, error) { +func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, []scaleclient.ScalesGetter, error) { clients := make([]clientset.Interface, numberOfClients) internalClients := make([]internalclientset.Interface, numberOfClients) + scalesClients := make([]scaleclient.ScalesGetter, numberOfClients) + for i := 0; i < numberOfClients; i++ { config, err := framework.LoadConfig() Expect(err).NotTo(HaveOccurred()) @@ -327,11 +335,11 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient // each client here. transportConfig, err := config.TransportConfig() if err != nil { - return nil, nil, err + return nil, nil, nil, err } tlsConfig, err := transport.TLSConfigFor(transportConfig) if err != nil { - return nil, nil, err + return nil, nil, nil, err } config.Transport = utilnet.SetTransportDefaults(&http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -349,16 +357,37 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient c, err := clientset.NewForConfig(config) if err != nil { - return nil, nil, err + return nil, nil, nil, err } clients[i] = c internalClient, err := internalclientset.NewForConfig(config) if err != nil { - return nil, nil, err + return nil, nil, nil, err } internalClients[i] = internalClient + + // create scale client, if GroupVersion or NegotiatedSerializer are not set + // assign default values - these fields are mandatory (required by RESTClientFor). + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{} + } + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = legacyscheme.Codecs + } + restClient, err := restclient.RESTClientFor(config) + if err != nil { + return nil, nil, nil, err + } + discoClient, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, nil, nil, err + } + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured) + resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient) + scalesClients[i] = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) } - return clients, internalClients, nil + return clients, internalClients, scalesClients, nil } func computePodCounts(total int) (int, int, int) { @@ -405,12 +434,13 @@ func generateConfigs( // Create a number of clients to better simulate real usecase // where not everyone is using exactly the same client. rcsPerClient := 20 - clients, internalClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient) + clients, internalClients, scalesClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient) framework.ExpectNoError(err) for i := 0; i < len(configs); i++ { configs[i].SetClient(clients[i%len(clients)]) configs[i].SetInternalClient(internalClients[i%len(internalClients)]) + configs[i].SetScalesClient(scalesClients[i%len(clients)]) } for i := 0; i < len(secretConfigs); i++ { secretConfigs[i].Client = clients[i%len(clients)] @@ -590,7 +620,16 @@ func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scaling sleepUpTo(scalingTime) newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2) framework.ExpectNoError(framework.ScaleResource( - config.GetClient(), config.GetInternalClient(), config.GetNamespace(), config.GetName(), newSize, true, config.GetKind()), + config.GetClient(), + config.GetInternalClient(), + config.GetScalesGetter(), + config.GetNamespace(), + config.GetName(), + newSize, + true, + config.GetKind(), + config.GetGroupResource(), + ), fmt.Sprintf("scaling %v %v", config.GetKind(), config.GetName())) selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.GetName()})) diff --git a/test/e2e/scheduling/equivalence_cache_predicates.go b/test/e2e/scheduling/equivalence_cache_predicates.go index 79aaf5d9d8b..3d551476399 100644 --- a/test/e2e/scheduling/equivalence_cache_predicates.go +++ b/test/e2e/scheduling/equivalence_cache_predicates.go @@ -155,7 +155,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() { By("Trying to schedule another equivalent Pod should fail due to node label has been removed.") // use scale to create another equivalent pod and wait for failure event WaitForSchedulerAfterAction(f, func() error { - err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false) + err := framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false) return err }, affinityRCName, false) // and this new pod should be rejected since node label has been updated diff --git a/test/e2e/scheduling/priorities.go b/test/e2e/scheduling/priorities.go index 1c34ea8998c..f3643b6cb13 100644 --- a/test/e2e/scheduling/priorities.go +++ b/test/e2e/scheduling/priorities.go @@ -196,7 +196,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() { By(fmt.Sprintf("Scale the RC: %s to len(nodeList.Item)-1 : %v.", rc.Name, len(nodeList.Items)-1)) - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rc.Name, uint(len(nodeList.Items)-1), true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true) testPods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{ LabelSelector: "name=scheduler-priority-avoid-pod", }) diff --git a/test/e2e/scheduling/rescheduler.go b/test/e2e/scheduling/rescheduler.go index 512e8b3c6f0..0d1107ccbe1 100644 --- a/test/e2e/scheduling/rescheduler.go +++ b/test/e2e/scheduling/rescheduler.go @@ -68,8 +68,8 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() { deployment := deployments.Items[0] replicas := uint(*(deployment.Spec.Replicas)) - err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas+1, true) - defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas, true)) + err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true) + defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true)) framework.ExpectNoError(err) }) @@ -80,7 +80,7 @@ func reserveAllCpu(f *framework.Framework, id string, millicores int) error { replicas := millicores / 100 reserveCpu(f, id, 1, 100) - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.Namespace.Name, id, uint(replicas), false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false)) for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) { pods, err := framework.GetPodsInNamespace(f.ClientSet, f.Namespace.Name, framework.ImagePullerLabels) diff --git a/test/integration/framework/BUILD b/test/integration/framework/BUILD index 07da3bc1141..27a3fb8e412 100644 --- a/test/integration/framework/BUILD +++ b/test/integration/framework/BUILD @@ -22,13 +22,10 @@ go_library( "//pkg/api/legacyscheme:go_default_library", "//pkg/api/testapi:go_default_library", "//pkg/apis/batch:go_default_library", - "//pkg/apis/core:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library", - "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/replication:go_default_library", "//pkg/generated/openapi:go_default_library", - "//pkg/kubectl:go_default_library", "//pkg/kubelet/client:go_default_library", "//pkg/master:go_default_library", "//pkg/util/env:go_default_library", diff --git a/test/integration/framework/util.go b/test/integration/framework/util.go index afb1d68961e..c9d42a99c48 100644 --- a/test/integration/framework/util.go +++ b/test/integration/framework/util.go @@ -19,22 +19,13 @@ limitations under the License. package framework import ( - "io/ioutil" "net/http/httptest" "strings" "testing" - "time" - - "github.com/golang/glog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/api/testapi" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/kubectl" ) const ( @@ -80,48 +71,3 @@ func CreateTestingNamespace(baseName string, apiserver *httptest.Server, t *test func DeleteTestingNamespace(ns *v1.Namespace, apiserver *httptest.Server, t *testing.T) { // TODO: Remove all resources from a given namespace once we implement CreateTestingNamespace. } - -// RCFromManifest reads a .json file and returns the rc in it. -func RCFromManifest(fileName string) *v1.ReplicationController { - data, err := ioutil.ReadFile(fileName) - if err != nil { - glog.Fatalf("Unexpected error reading rc manifest %v", err) - } - var controller v1.ReplicationController - if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil { - glog.Fatalf("Unexpected error reading rc manifest %v", err) - } - return &controller -} - -// StopRC stops the rc via kubectl's stop library -func StopRC(rc *v1.ReplicationController, clientset internalclientset.Interface) error { - reaper, err := kubectl.ReaperFor(api.Kind("ReplicationController"), clientset) - if err != nil || reaper == nil { - return err - } - err = reaper.Stop(rc.Namespace, rc.Name, 0, nil) - if err != nil { - return err - } - return nil -} - -// ScaleRC scales the given rc to the given replicas. -func ScaleRC(name, ns string, replicas int32, clientset internalclientset.Interface) (*api.ReplicationController, error) { - scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), clientset) - if err != nil { - return nil, err - } - retry := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout} - waitForReplicas := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout} - err = scaler.Scale(ns, name, uint(replicas), nil, retry, waitForReplicas) - if err != nil { - return nil, err - } - scaled, err := clientset.Core().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return scaled, nil -} diff --git a/test/utils/BUILD b/test/utils/BUILD index a2e6045933f..da7eeadab95 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -44,6 +44,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], diff --git a/test/utils/runners.go b/test/utils/runners.go index 2eaf28e48e2..1d71a3eeb62 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/util/workqueue" batchinternal "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" @@ -105,16 +106,20 @@ type RunObjectConfig interface { GetKind() schema.GroupKind GetClient() clientset.Interface GetInternalClient() internalclientset.Interface + GetScalesGetter() scaleclient.ScalesGetter SetClient(clientset.Interface) SetInternalClient(internalclientset.Interface) + SetScalesClient(scaleclient.ScalesGetter) GetReplicas() int GetLabelValue(string) (string, bool) + GetGroupResource() schema.GroupResource } type RCConfig struct { Affinity *v1.Affinity Client clientset.Interface InternalClient internalclientset.Interface + ScalesGetter scaleclient.ScalesGetter Image string Command []string Name string @@ -277,6 +282,10 @@ func (config *DeploymentConfig) GetKind() schema.GroupKind { return extensionsinternal.Kind("Deployment") } +func (config *DeploymentConfig) GetGroupResource() schema.GroupResource { + return extensionsinternal.Resource("deployments") +} + func (config *DeploymentConfig) create() error { deployment := &extensions.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -344,6 +353,10 @@ func (config *ReplicaSetConfig) GetKind() schema.GroupKind { return extensionsinternal.Kind("ReplicaSet") } +func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource { + return extensionsinternal.Resource("replicasets") +} + func (config *ReplicaSetConfig) create() error { rs := &extensions.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ @@ -411,6 +424,10 @@ func (config *JobConfig) GetKind() schema.GroupKind { return batchinternal.Kind("Job") } +func (config *JobConfig) GetGroupResource() schema.GroupResource { + return batchinternal.Resource("jobs") +} + func (config *JobConfig) create() error { job := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -482,6 +499,10 @@ func (config *RCConfig) GetKind() schema.GroupKind { return api.Kind("ReplicationController") } +func (config *RCConfig) GetGroupResource() schema.GroupResource { + return api.Resource("replicationcontrollers") +} + func (config *RCConfig) GetClient() clientset.Interface { return config.Client } @@ -490,6 +511,10 @@ func (config *RCConfig) GetInternalClient() internalclientset.Interface { return config.InternalClient } +func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter { + return config.ScalesGetter +} + func (config *RCConfig) SetClient(c clientset.Interface) { config.Client = c } @@ -498,6 +523,10 @@ func (config *RCConfig) SetInternalClient(c internalclientset.Interface) { config.InternalClient = c } +func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) { + config.ScalesGetter = getter +} + func (config *RCConfig) GetReplicas() int { return config.Replicas }