mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #18070 from gmarek/split-clients
Auto commit by PR queue bot
This commit is contained in:
commit
439019cbbb
@ -235,6 +235,16 @@ func (s *CMServer) ResyncPeriod() time.Duration {
|
|||||||
return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
|
return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clientForUserAgentOrDie(config client.Config, userAgent string) *client.Client {
|
||||||
|
fullUserAgent := client.DefaultKubernetesUserAgent() + "/" + userAgent
|
||||||
|
config.UserAgent = fullUserAgent
|
||||||
|
kubeClient, err := client.New(&config)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Invalid API configuration: %v", err)
|
||||||
|
}
|
||||||
|
return kubeClient
|
||||||
|
}
|
||||||
|
|
||||||
// Run runs the CMServer. This should never exit.
|
// Run runs the CMServer. This should never exit.
|
||||||
func (s *CMServer) Run(_ []string) error {
|
func (s *CMServer) Run(_ []string) error {
|
||||||
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||||
@ -268,14 +278,17 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Fatal(server.ListenAndServe())
|
glog.Fatal(server.ListenAndServe())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go endpointcontroller.NewEndpointController(kubeClient, s.ResyncPeriod).
|
go endpointcontroller.NewEndpointController(clientForUserAgentOrDie(*kubeconfig, "endpoint-controller"), s.ResyncPeriod).
|
||||||
Run(s.ConcurrentEndpointSyncs, util.NeverStop)
|
Run(s.ConcurrentEndpointSyncs, util.NeverStop)
|
||||||
|
|
||||||
go replicationcontroller.NewReplicationManager(kubeClient, s.ResyncPeriod, replicationcontroller.BurstReplicas).
|
go replicationcontroller.NewReplicationManager(
|
||||||
Run(s.ConcurrentRCSyncs, util.NeverStop)
|
clientForUserAgentOrDie(*kubeconfig, "replication-controller"),
|
||||||
|
s.ResyncPeriod,
|
||||||
|
replicationcontroller.BurstReplicas,
|
||||||
|
).Run(s.ConcurrentRCSyncs, util.NeverStop)
|
||||||
|
|
||||||
if s.TerminatedPodGCThreshold > 0 {
|
if s.TerminatedPodGCThreshold > 0 {
|
||||||
go gc.New(kubeClient, s.ResyncPeriod, s.TerminatedPodGCThreshold).
|
go gc.New(clientForUserAgentOrDie(*kubeconfig, "garbage-collector"), s.ResyncPeriod, s.TerminatedPodGCThreshold).
|
||||||
Run(util.NeverStop)
|
Run(util.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,13 +297,13 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
|
nodeController := nodecontroller.NewNodeController(cloud, clientForUserAgentOrDie(*kubeconfig, "node-controller"),
|
||||||
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||||
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
|
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
|
||||||
nodeController.Run(s.NodeSyncPeriod)
|
nodeController.Run(s.NodeSyncPeriod)
|
||||||
|
|
||||||
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
serviceController := servicecontroller.New(cloud, clientForUserAgentOrDie(*kubeconfig, "service-controller"), s.ClusterName)
|
||||||
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
|
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
|
||||||
glog.Errorf("Failed to start service controller: %v", err)
|
glog.Errorf("Failed to start service controller: %v", err)
|
||||||
}
|
}
|
||||||
@ -301,7 +314,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
} else if routes, ok := cloud.Routes(); !ok {
|
} else if routes, ok := cloud.Routes(); !ok {
|
||||||
glog.Warning("allocate-node-cidrs is set, but cloud provider does not support routes. Will not manage routes.")
|
glog.Warning("allocate-node-cidrs is set, but cloud provider does not support routes. Will not manage routes.")
|
||||||
} else {
|
} else {
|
||||||
routeController := routecontroller.New(routes, kubeClient, s.ClusterName, &s.ClusterCIDR)
|
routeController := routecontroller.New(routes, clientForUserAgentOrDie(*kubeconfig, "route-controller"), s.ClusterName, &s.ClusterCIDR)
|
||||||
routeController.Run(s.NodeSyncPeriod)
|
routeController.Run(s.NodeSyncPeriod)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -309,7 +322,8 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go resourcequotacontroller.NewResourceQuotaController(
|
go resourcequotacontroller.NewResourceQuotaController(
|
||||||
kubeClient, controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
|
clientForUserAgentOrDie(*kubeconfig, "resourcequota-controller"),
|
||||||
|
controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
|
||||||
|
|
||||||
// If apiserver is not running we should wait for some time and fail only then. This is particularly
|
// If apiserver is not running we should wait for some time and fail only then. This is particularly
|
||||||
// important when we start apiserver and controller manager at the same time.
|
// important when we start apiserver and controller manager at the same time.
|
||||||
@ -331,7 +345,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Fatalf("Failed to get supported resources from server: %v", err)
|
glog.Fatalf("Failed to get supported resources from server: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
namespacecontroller.NewNamespaceController(kubeClient, versions, s.NamespaceSyncPeriod).Run()
|
namespacecontroller.NewNamespaceController(clientForUserAgentOrDie(*kubeconfig, "namespace-controller"), versions, s.NamespaceSyncPeriod).Run()
|
||||||
|
|
||||||
groupVersion := "extensions/v1beta1"
|
groupVersion := "extensions/v1beta1"
|
||||||
resources, found := resourceMap[groupVersion]
|
resources, found := resourceMap[groupVersion]
|
||||||
@ -340,34 +354,41 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Infof("Starting %s apis", groupVersion)
|
glog.Infof("Starting %s apis", groupVersion)
|
||||||
if containsResource(resources, "horizontalpodautoscalers") {
|
if containsResource(resources, "horizontalpodautoscalers") {
|
||||||
glog.Infof("Starting horizontal pod controller.")
|
glog.Infof("Starting horizontal pod controller.")
|
||||||
metricsClient := metrics.NewHeapsterMetricsClient(kubeClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
|
hpaClient := clientForUserAgentOrDie(*kubeconfig, "horizontal-pod-autoscaler")
|
||||||
podautoscaler.NewHorizontalController(kubeClient, metricsClient).
|
metricsClient := metrics.NewHeapsterMetricsClient(
|
||||||
|
hpaClient,
|
||||||
|
metrics.DefaultHeapsterNamespace,
|
||||||
|
metrics.DefaultHeapsterScheme,
|
||||||
|
metrics.DefaultHeapsterService,
|
||||||
|
metrics.DefaultHeapsterPort,
|
||||||
|
)
|
||||||
|
podautoscaler.NewHorizontalController(hpaClient, metricsClient).
|
||||||
Run(s.HorizontalPodAutoscalerSyncPeriod)
|
Run(s.HorizontalPodAutoscalerSyncPeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
if containsResource(resources, "daemonsets") {
|
if containsResource(resources, "daemonsets") {
|
||||||
glog.Infof("Starting daemon set controller")
|
glog.Infof("Starting daemon set controller")
|
||||||
go daemon.NewDaemonSetsController(kubeClient, s.ResyncPeriod).
|
go daemon.NewDaemonSetsController(clientForUserAgentOrDie(*kubeconfig, "daemon-set-controller"), s.ResyncPeriod).
|
||||||
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
if containsResource(resources, "jobs") {
|
if containsResource(resources, "jobs") {
|
||||||
glog.Infof("Starting job controller")
|
glog.Infof("Starting job controller")
|
||||||
go job.NewJobController(kubeClient, s.ResyncPeriod).
|
go job.NewJobController(clientForUserAgentOrDie(*kubeconfig, "job-controller"), s.ResyncPeriod).
|
||||||
Run(s.ConcurrentJobSyncs, util.NeverStop)
|
Run(s.ConcurrentJobSyncs, util.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
if containsResource(resources, "deployments") {
|
if containsResource(resources, "deployments") {
|
||||||
glog.Infof("Starting deployment controller")
|
glog.Infof("Starting deployment controller")
|
||||||
deployment.New(kubeClient).
|
deployment.New(clientForUserAgentOrDie(*kubeconfig, "deployment-controller")).
|
||||||
Run(s.DeploymentControllerSyncPeriod)
|
Run(s.DeploymentControllerSyncPeriod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
|
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod)
|
||||||
pvclaimBinder.Run()
|
pvclaimBinder.Run()
|
||||||
|
|
||||||
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
|
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
|
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
|
||||||
}
|
}
|
||||||
@ -393,7 +414,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Errorf("Error reading key for service account token controller: %v", err)
|
glog.Errorf("Error reading key for service account token controller: %v", err)
|
||||||
} else {
|
} else {
|
||||||
serviceaccount.NewTokensController(
|
serviceaccount.NewTokensController(
|
||||||
kubeClient,
|
clientForUserAgentOrDie(*kubeconfig, "tokens-controller"),
|
||||||
serviceaccount.TokensControllerOptions{
|
serviceaccount.TokensControllerOptions{
|
||||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||||
RootCA: rootCA,
|
RootCA: rootCA,
|
||||||
@ -403,7 +424,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
serviceaccount.NewServiceAccountsController(
|
serviceaccount.NewServiceAccountsController(
|
||||||
kubeClient,
|
clientForUserAgentOrDie(*kubeconfig, "service-account-controller"),
|
||||||
serviceaccount.DefaultServiceAccountsControllerOptions(),
|
serviceaccount.DefaultServiceAccountsControllerOptions(),
|
||||||
).Run()
|
).Run()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user