From b83a31700340232c48bac7c82b4afc4adb8ff6f9 Mon Sep 17 00:00:00 2001 From: deads2k Date: Tue, 20 Sep 2016 09:43:11 -0400 Subject: [PATCH] switch controller manager to generated clientset --- .../app/controllermanager.go | 78 ++++++++++--------- cmd/libs/go2idl/client-gen/main.go | 1 + pkg/apis/policy/types.go | 1 - pkg/client/leaderelection/leaderelection.go | 16 ++-- .../leaderelection/leaderelection_test.go | 57 +++++++------- pkg/controller/disruption/disruption.go | 16 ++-- .../endpoint/endpoints_controller.go | 14 ++-- pkg/controller/petset/pet.go | 16 ++-- pkg/controller/petset/pet_set.go | 12 +-- pkg/controller/petset/pet_set_utils.go | 21 ++--- plugin/cmd/kube-scheduler/app/server.go | 17 ++-- 11 files changed, 125 insertions(+), 124 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index a1ba77ffac1..b9428db3c6e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -38,11 +38,11 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/typed/dynamic" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" @@ -133,8 +133,7 @@ func Run(s *options.CMServer) error { // Override kubeconfig qps/burst settings from flags kubeconfig.QPS = s.KubeAPIQPS kubeconfig.Burst = int(s.KubeAPIBurst) - - kubeClient, err := client.New(kubeconfig) + kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager")) if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } @@ -159,11 +158,11 @@ func Run(s *options.CMServer) error { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"}) run := func(stop <-chan struct{}) { - err := StartControllers(s, kubeClient, kubeconfig, stop, recorder) + err := StartControllers(s, kubeconfig, stop, recorder) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -183,12 +182,12 @@ func Run(s *options.CMServer) error { Namespace: "kube-system", Name: "kube-controller-manager", }, - Client: kubeClient, - Identity: id, - EventRecorder: recorder, - LeaseDuration: s.LeaderElection.LeaseDuration.Duration, - RenewDeadline: s.LeaderElection.RenewDeadline.Duration, - RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + EndpointsClient: kubeClient, + Identity: id, + EventRecorder: recorder, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { @@ -199,16 +198,20 @@ func Run(s *options.CMServer) error { panic("unreachable") } -func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error { - sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)()) +func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error { + client := func(userAgent string) clientset.Interface { + return clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, userAgent)) + } + discoveryClient := client("controller-discovery").Discovery() + sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), ResyncPeriod(s)()) - go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( sharedInformers.Pods().Informer(), - clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), + client("replication-controller"), ResyncPeriod(s), replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC), @@ -217,7 +220,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.TerminatedPodGCThreshold > 0 { - go podgc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)). + go podgc.New(client("pod-garbage-collector"), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)). Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -235,7 +238,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, client("node-controller"), s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) @@ -245,7 +248,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName) + serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) if err != nil { glog.Errorf("Failed to start service controller: %v", err) } else { @@ -259,7 +262,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } else if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") } else { - routeController := routecontroller.New(routes, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "route-controller")), s.ClusterName, clusterCIDR) + routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR) routeController.Run(s.RouteReconciliationPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -267,7 +270,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) } - resourceQuotaControllerClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "resourcequota-controller")) + resourceQuotaControllerClient := client("resourcequota-controller") resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient) groupKindsToReplenish := []unversioned.GroupKind{ api.Kind("Pod"), @@ -303,13 +306,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } versions := &unversioned.APIVersions{Versions: versionStrings} - resourceMap, err := kubeClient.Discovery().ServerResources() + resourceMap, err := discoveryClient.ServerResources() if err != nil { glog.Fatalf("Failed to get supported resources from server: %v", err) } // Find the list of namespaced resources via discovery that the namespace controller must manage - namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")) + namespaceKubeClient := client("namespace-controller") namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc) groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() if err != nil { @@ -326,7 +329,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "horizontalpodautoscalers") { glog.Infof("Starting horizontal pod controller.") - hpaClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "horizontal-pod-autoscaler")) + hpaClient := client("horizontal-pod-autoscaler") metricsClient := metrics.NewHeapsterMetricsClient( hpaClient, metrics.DefaultHeapsterNamespace, @@ -334,35 +337,35 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort, ) - go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient, metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration). + go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration). Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)). + go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). + go job.NewJobController(sharedInformers.Pods().Informer(), client("job-controller")). Run(int(s.ConcurrentJobSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)). + go deployment.NewDeploymentController(client("deployment-controller"), ResyncPeriod(s)). Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), client("replicaset-controller"), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -375,7 +378,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "poddisruptionbudgets") { glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), kubeClient).Run(wait.NeverStop) + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -390,8 +393,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig resyncPeriod := ResyncPeriod(s)() go petset.NewPetSetController( sharedInformers.Pods().Informer(), - // TODO: Switch to using clientset - kubeClient, + client("petset-controller"), resyncPeriod, ).Run(1, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -406,7 +408,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting scheduledjob controller") // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} - go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))). + go scheduledjob.NewScheduledJobController(client("scheduledjob-controller")). Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -420,7 +422,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) } volumeController := persistentvolumecontroller.NewPersistentVolumeController( - clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")), + client("persistent-volume-binder"), s.PVClaimBinderSyncPeriod.Duration, alphaProvisioner, ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), @@ -437,7 +439,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( - clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), + client("attachdetach-controller"), sharedInformers.Pods().Informer(), sharedInformers.Nodes().Informer(), sharedInformers.PersistentVolumeClaims().Informer(), @@ -460,7 +462,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting certificate request controller") resyncPeriod := ResyncPeriod(s)() certController, err := certcontroller.NewCertificateController( - clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "certificate-controller")), + client("certificate-controller"), resyncPeriod, s.ClusterSigningCertFile, s.ClusterSigningKeyFile, @@ -495,7 +497,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Errorf("Error reading key for service account token controller: %v", err) } else { go serviceaccountcontroller.NewTokensController( - clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")), + client("tokens-controller"), serviceaccountcontroller.TokensControllerOptions{ TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, @@ -506,13 +508,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } serviceaccountcontroller.NewServiceAccountsController( - clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-account-controller")), + client("service-account-controller"), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ).Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.EnableGarbageCollector { - gcClientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector")) + gcClientset := client("generic-garbage-collector") groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources() if err != nil { glog.Fatalf("Failed to get supported resources from server: %v", err) diff --git a/cmd/libs/go2idl/client-gen/main.go b/cmd/libs/go2idl/client-gen/main.go index 45ca829cbb0..a44b275f225 100644 --- a/cmd/libs/go2idl/client-gen/main.go +++ b/cmd/libs/go2idl/client-gen/main.go @@ -44,6 +44,7 @@ var ( "rbac/", "storage/", "apps/", + "policy/", }, "group/versions that client-gen will generate clients for. At most one version per group is allowed. Specified in the format \"group1/version1,group2/version2...\". Default to \"api/,extensions/,autoscaling/,batch/,rbac/\"") includedTypesOverrides = flag.StringSlice("included-types-overrides", []string{}, "list of group/version/type for which client should be generated. By default, client is generated for all types which have genclient=true in types.go. This overrides that. For each groupVersion in this list, only the types mentioned here will be included. The default check of genclient=true will be used for other group versions.") basePath = flag.String("input-base", "k8s.io/kubernetes/pkg/apis", "base path to look for the api group. Default to \"k8s.io/kubernetes/pkg/apis\"") diff --git a/pkg/apis/policy/types.go b/pkg/apis/policy/types.go index a79732a5d60..73b754042f5 100644 --- a/pkg/apis/policy/types.go +++ b/pkg/apis/policy/types.go @@ -50,7 +50,6 @@ type PodDisruptionBudgetStatus struct { } // +genclient=true -// +noMethods=true // PodDisruptionBudget is an object to define the max disruption that can be caused to a collection of pods type PodDisruptionBudget struct { diff --git a/pkg/client/leaderelection/leaderelection.go b/pkg/client/leaderelection/leaderelection.go index f69de06f6ff..ce9821c3956 100644 --- a/pkg/client/leaderelection/leaderelection.go +++ b/pkg/client/leaderelection/leaderelection.go @@ -58,8 +58,8 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/componentconfig" + coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -85,8 +85,8 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") } - if lec.Client == nil { - return nil, fmt.Errorf("Client must not be nil.") + if lec.EndpointsClient == nil { + return nil, fmt.Errorf("EndpointsClient must not be nil.") } if lec.EventRecorder == nil { return nil, fmt.Errorf("EventRecorder must not be nil.") @@ -103,8 +103,8 @@ type LeaderElectionConfig struct { // Identity is a unique identifier of the leader elector. Identity string - Client client.Interface - EventRecorder record.EventRecorder + EndpointsClient coreclientset.EndpointsGetter + EventRecorder record.EventRecorder // LeaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. This is measured against time of @@ -246,7 +246,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { AcquireTime: now, } - e, err := le.config.Client.Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name) + e, err := le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name) if err != nil { if !errors.IsNotFound(err) { glog.Errorf("error retrieving endpoint: %v", err) @@ -257,7 +257,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { if err != nil { return false } - _, err = le.config.Client.Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{ + _, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: le.config.EndpointsMeta.Name, Namespace: le.config.EndpointsMeta.Namespace, @@ -312,7 +312,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { } e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes) - _, err = le.config.Client.Endpoints(le.config.EndpointsMeta.Namespace).Update(e) + _, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Update(e) if err != nil { glog.Errorf("err: %v", err) return false diff --git a/pkg/client/leaderelection/leaderelection_test.go b/pkg/client/leaderelection/leaderelection_test.go index 15b9b592601..5d90cca551b 100644 --- a/pkg/client/leaderelection/leaderelection_test.go +++ b/pkg/client/leaderelection/leaderelection_test.go @@ -29,8 +29,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/record" - "k8s.io/kubernetes/pkg/client/unversioned/testclient" + testcore "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/runtime" ) @@ -43,7 +44,7 @@ func TestTryAcquireOrRenew(t *testing.T) { observedTime time.Time reactors []struct { verb string - reaction testclient.ReactionFunc + reaction testcore.ReactionFunc } expectSuccess bool @@ -54,18 +55,18 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testclient.ReactionFunc + reaction testcore.ReactionFunc }{ { verb: "get", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, errors.NewNotFound(api.Resource(action.(testclient.GetAction).GetResource()), action.(testclient.GetAction).GetName()) + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(testcore.GetAction).GetResource().GroupResource(), action.(testcore.GetAction).GetName()) }, }, { verb: "create", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testclient.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -76,23 +77,23 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testclient.ReactionFunc + reaction testcore.ReactionFunc }{ { verb: "get", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testclient.GetAction).GetName(), + Name: action.(testcore.GetAction).GetName(), }, }, nil }, }, { verb: "update", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testclient.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -105,15 +106,15 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testclient.ReactionFunc + reaction testcore.ReactionFunc }{ { verb: "get", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testclient.GetAction).GetName(), + Name: action.(testcore.GetAction).GetName(), Annotations: map[string]string{ LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, }, @@ -123,8 +124,8 @@ func TestTryAcquireOrRenew(t *testing.T) { }, { verb: "update", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testclient.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -139,15 +140,15 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testclient.ReactionFunc + reaction testcore.ReactionFunc }{ { verb: "get", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testclient.GetAction).GetName(), + Name: action.(testcore.GetAction).GetName(), Annotations: map[string]string{ LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, }, @@ -165,15 +166,15 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testclient.ReactionFunc + reaction testcore.ReactionFunc }{ { verb: "get", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testclient.GetAction).GetName(), + Name: action.(testcore.GetAction).GetName(), Annotations: map[string]string{ LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`, }, @@ -183,8 +184,8 @@ func TestTryAcquireOrRenew(t *testing.T) { }, { verb: "update", - reaction: func(action testclient.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testclient.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -214,11 +215,11 @@ func TestTryAcquireOrRenew(t *testing.T) { }, }, } - c := &testclient.Fake{} + c := &fake.Clientset{} for _, reactor := range test.reactors { c.AddReactor(reactor.verb, "endpoints", reactor.reaction) } - c.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) { + c.AddReactor("*", "*", func(action testcore.Action) (bool, runtime.Object, error) { t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) return true, nil, fmt.Errorf("uncreachable action") }) @@ -228,7 +229,7 @@ func TestTryAcquireOrRenew(t *testing.T) { observedRecord: test.observedRecord, observedTime: test.observedTime, } - le.config.Client = c + le.config.EndpointsClient = c.Core() if test.expectSuccess != le.tryAcquireOrRenew() { t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index c297e09db5d..40aaf32535e 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -25,8 +25,10 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/policy" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + policyclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" @@ -43,7 +45,7 @@ const statusUpdateRetries = 2 type updater func(*policy.PodDisruptionBudget) error type DisruptionController struct { - kubeClient *client.Client + kubeClient internalclientset.Interface pdbStore cache.Store pdbController *cache.Controller @@ -83,7 +85,7 @@ type controllerAndScale struct { // controllers and their scale. type podControllerFinder func(*api.Pod) ([]controllerAndScale, error) -func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient *client.Client) *DisruptionController { +func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient internalclientset.Interface) *DisruptionController { dc := &DisruptionController{ kubeClient: kubeClient, podController: podInformer.GetController(), @@ -124,10 +126,10 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient * dc.rcIndexer, dc.rcController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dc.kubeClient.ReplicationControllers(api.NamespaceAll).List(options) + return dc.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(options) + return dc.kubeClient.Core().ReplicationControllers(api.NamespaceAll).Watch(options) }, }, &api.ReplicationController{}, @@ -256,7 +258,7 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) { glog.V(0).Infof("Starting disruption controller") if dc.kubeClient != nil { glog.V(0).Infof("Sending events to api server.") - dc.broadcaster.StartRecordingToSink(dc.kubeClient.Events("")) + dc.broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: dc.kubeClient.Core().Events("")}) } else { glog.V(0).Infof("No api server defined - no events will be sent to API server.") } @@ -589,7 +591,7 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c // refresh tries to re-GET the given PDB. If there are any errors, it just // returns the old PDB. Intended to be used in a retry loop where it runs a // bounded number of times. -func refresh(pdbClient client.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget { +func refresh(pdbClient policyclientset.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget { newPdb, err := pdbClient.Get(pdb.Name) if err == nil { return newPdb diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ba8d4500fbb..2ca3f9cec20 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -70,7 +70,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(podInformer cache.SharedIndexInformer, client *clientset.Clientset) *EndpointController { +func NewEndpointController(podInformer cache.SharedIndexInformer, client clientset.Interface) *EndpointController { if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().GetRESTClient().GetRateLimiter()) } @@ -123,7 +123,7 @@ func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod c // EndpointController manages selector-based service endpoints. type EndpointController struct { - client *clientset.Clientset + client clientset.Interface serviceStore cache.StoreToServiceLister podStore cache.StoreToPodLister @@ -347,7 +347,7 @@ func (e *EndpointController) syncService(key string) error { // Don't retry, as the key isn't going to magically become understandable. return nil } - err = e.client.Endpoints(namespace).Delete(name, nil) + err = e.client.Core().Endpoints(namespace).Delete(name, nil) if err != nil && !errors.IsNotFound(err) { return err } @@ -450,7 +450,7 @@ func (e *EndpointController) syncService(key string) error { subsets = endpoints.RepackSubsets(subsets) // See if there's actually an update here. - currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) + currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name) if err != nil { if errors.IsNotFound(err) { currentEndpoints = &api.Endpoints{ @@ -496,10 +496,10 @@ func (e *EndpointController) syncService(key string) error { createEndpoints := len(currentEndpoints.ResourceVersion) == 0 if createEndpoints { // No previous endpoints, create them - _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) + _, err = e.client.Core().Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) + _, err = e.client.Core().Endpoints(service.Namespace).Update(newEndpoints) } if err != nil { if createEndpoints && errors.IsForbidden(err) { @@ -521,7 +521,7 @@ func (e *EndpointController) syncService(key string) error { // some stragglers could have been left behind if the endpoint controller // reboots). func (e *EndpointController) checkLeftoverEndpoints() { - list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{}) + list, err := e.client.Core().Endpoints(api.NamespaceAll).List(api.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return diff --git a/pkg/controller/petset/pet.go b/pkg/controller/petset/pet.go index d2dd5f77c41..c8180860536 100644 --- a/pkg/controller/petset/pet.go +++ b/pkg/controller/petset/pet.go @@ -23,8 +23,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" "github.com/golang/glog" @@ -159,7 +159,7 @@ type petClient interface { // apiServerPetClient is a petset aware Kubernetes client. type apiServerPetClient struct { - c *client.Client + c internalclientset.Interface recorder record.EventRecorder petHealthChecker } @@ -167,7 +167,7 @@ type apiServerPetClient struct { // Get gets the pet in the pcb from the apiserver. func (p *apiServerPetClient) Get(pet *pcb) (*pcb, bool, error) { ns := pet.parent.Namespace - pod, err := podClient(p.c, ns).Get(pet.pod.Name) + pod, err := p.c.Core().Pods(ns).Get(pet.pod.Name) if errors.IsNotFound(err) { return nil, false, nil } @@ -181,7 +181,7 @@ func (p *apiServerPetClient) Get(pet *pcb) (*pcb, bool, error) { // Delete deletes the pet in the pcb from the apiserver. func (p *apiServerPetClient) Delete(pet *pcb) error { - err := podClient(p.c, pet.parent.Namespace).Delete(pet.pod.Name, nil) + err := p.c.Core().Pods(pet.parent.Namespace).Delete(pet.pod.Name, nil) if errors.IsNotFound(err) { err = nil } @@ -191,7 +191,7 @@ func (p *apiServerPetClient) Delete(pet *pcb) error { // Create creates the pet in the pcb. func (p *apiServerPetClient) Create(pet *pcb) error { - _, err := podClient(p.c, pet.parent.Namespace).Create(pet.pod) + _, err := p.c.Core().Pods(pet.parent.Namespace).Create(pet.pod) p.event(pet.parent, "Create", fmt.Sprintf("pet: %v", pet.pod.Name), err) return err } @@ -200,7 +200,7 @@ func (p *apiServerPetClient) Create(pet *pcb) error { // If the pod object of a pet which to be updated has been changed in server side, we // will get the actual value and set pet identity before retries. func (p *apiServerPetClient) Update(pet *pcb, expectedPet *pcb) (updateErr error) { - pc := podClient(p.c, pet.parent.Namespace) + pc := p.c.Core().Pods(pet.parent.Namespace) for i := 0; ; i++ { updatePod, needsUpdate, err := copyPetID(pet, expectedPet) @@ -227,12 +227,12 @@ func (p *apiServerPetClient) DeletePVCs(pet *pcb) error { } func (p *apiServerPetClient) getPVC(pvcName, pvcNamespace string) (*api.PersistentVolumeClaim, error) { - pvc, err := claimClient(p.c, pvcNamespace).Get(pvcName) + pvc, err := p.c.Core().PersistentVolumeClaims(pvcNamespace).Get(pvcName) return pvc, err } func (p *apiServerPetClient) createPVC(pvc *api.PersistentVolumeClaim) error { - _, err := claimClient(p.c, pvc.Namespace).Create(pvc) + _, err := p.c.Core().PersistentVolumeClaims(pvc.Namespace).Create(pvc) return err } diff --git a/pkg/controller/petset/pet_set.go b/pkg/controller/petset/pet_set.go index 4aa1ec3f11e..cf8471d98ef 100644 --- a/pkg/controller/petset/pet_set.go +++ b/pkg/controller/petset/pet_set.go @@ -26,8 +26,10 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/errors" @@ -50,7 +52,7 @@ const ( // PetSetController controls petsets. type PetSetController struct { - kubeClient *client.Client + kubeClient internalclientset.Interface // newSyncer returns an interface capable of syncing a single pet. // Abstracted out for testing. @@ -81,10 +83,10 @@ type PetSetController struct { } // NewPetSetController creates a new petset controller. -func NewPetSetController(podInformer cache.SharedIndexInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController { +func NewPetSetController(podInformer cache.SharedIndexInformer, kubeClient internalclientset.Interface, resyncPeriod time.Duration) *PetSetController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "petset"}) pc := &apiServerPetClient{kubeClient, recorder, &defaultPetHealthChecker{}} @@ -309,7 +311,7 @@ func (psc *PetSetController) Sync(key string) error { } numPets, syncErr := psc.syncPetSet(&ps, petList) - if updateErr := updatePetCount(psc.kubeClient, ps, numPets); updateErr != nil { + if updateErr := updatePetCount(psc.kubeClient.Apps(), ps, numPets); updateErr != nil { glog.Infof("Failed to update replica count for petset %v/%v; requeuing; error: %v", ps.Namespace, ps.Name, updateErr) return errors.NewAggregate([]error{syncErr, updateErr}) } diff --git a/pkg/controller/petset/pet_set_utils.go b/pkg/controller/petset/pet_set_utils.go index dac6c378c9d..791350b64b5 100644 --- a/pkg/controller/petset/pet_set_utils.go +++ b/pkg/controller/petset/pet_set_utils.go @@ -23,7 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/client/cache" - client "k8s.io/kubernetes/pkg/client/unversioned" + appsclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/unversioned" "k8s.io/kubernetes/pkg/controller" "github.com/golang/glog" @@ -44,37 +44,26 @@ func (o overlappingPetSets) Less(i, j int) bool { } // updatePetCount attempts to update the Status.Replicas of the given PetSet, with a single GET/PUT retry. -func updatePetCount(kubeClient *client.Client, ps apps.PetSet, numPets int) (updateErr error) { - if ps.Status.Replicas == numPets || kubeClient == nil { +func updatePetCount(psClient appsclientset.PetSetsGetter, ps apps.PetSet, numPets int) (updateErr error) { + if ps.Status.Replicas == numPets || psClient == nil { return nil } - psClient := kubeClient.Apps().PetSets(ps.Namespace) var getErr error for i, ps := 0, &ps; ; i++ { glog.V(4).Infof(fmt.Sprintf("Updating replica count for PetSet: %s/%s, ", ps.Namespace, ps.Name) + fmt.Sprintf("replicas %d->%d (need %d), ", ps.Status.Replicas, numPets, ps.Spec.Replicas)) ps.Status = apps.PetSetStatus{Replicas: numPets} - _, updateErr = psClient.UpdateStatus(ps) + _, updateErr = psClient.PetSets(ps.Namespace).UpdateStatus(ps) if updateErr == nil || i >= statusUpdateRetries { return updateErr } - if ps, getErr = psClient.Get(ps.Name); getErr != nil { + if ps, getErr = psClient.PetSets(ps.Namespace).Get(ps.Name); getErr != nil { return getErr } } } -// claimClient returns the pvcClient for the given kubeClient/ns. -func claimClient(kubeClient *client.Client, ns string) client.PersistentVolumeClaimInterface { - return kubeClient.PersistentVolumeClaims(ns) -} - -// podClient returns the given podClient for the given kubeClient/ns. -func podClient(kubeClient *client.Client, ns string) client.PodInterface { - return kubeClient.Pods(ns) -} - // unhealthyPetTracker tracks unhealthy pets for petsets. type unhealthyPetTracker struct { pc petClient diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index bf7e39ce101..5977fa84e71 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -27,6 +27,7 @@ import ( "strconv" "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -89,6 +90,10 @@ func Run(s *options.SchedulerServer) error { if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } + kubeClientset, err := clientset.NewForConfig(kubeconfig) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } go func() { mux := http.NewServeMux() @@ -144,12 +149,12 @@ func Run(s *options.SchedulerServer) error { Namespace: "kube-system", Name: "kube-scheduler", }, - Client: kubeClient, - Identity: id, - EventRecorder: config.Recorder, - LeaseDuration: s.LeaderElection.LeaseDuration.Duration, - RenewDeadline: s.LeaderElection.RenewDeadline.Duration, - RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + EndpointsClient: kubeClientset.Core(), + Identity: id, + EventRecorder: config.Recorder, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() {