Merge pull request #19412 from mesosphere/jdef_sync_controllermanager

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-01-11 00:16:27 -08:00
commit dd81bf9f9d

View File

@ -36,10 +36,15 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
kendpoint "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/gc"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node" nodecontroller "k8s.io/kubernetes/pkg/controller/node"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
routecontroller "k8s.io/kubernetes/pkg/controller/route" routecontroller "k8s.io/kubernetes/pkg/controller/route"
@ -48,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/profile"
kmendpoint "k8s.io/kubernetes/contrib/mesos/pkg/service" kmendpoint "k8s.io/kubernetes/contrib/mesos/pkg/service"
@ -120,14 +126,16 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatal(server.ListenAndServe()) glog.Fatal(server.ListenAndServe())
}() }()
endpoints := s.createEndpointController(kubeClient) endpoints := s.createEndpointController(clientForUserAgentOrDie(*kubeconfig, "endpoint-controller"))
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
go replicationcontroller.NewReplicationManager(kubeClient, s.resyncPeriod, replicationcontroller.BurstReplicas). go replicationcontroller.NewReplicationManager(clientForUserAgentOrDie(*kubeconfig, "replication-controller"), s.resyncPeriod, replicationcontroller.BurstReplicas).
Run(s.ConcurrentRCSyncs, util.NeverStop) Run(s.ConcurrentRCSyncs, util.NeverStop)
go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod). if s.TerminatedPodGCThreshold > 0 {
Run(s.ConcurrentDSCSyncs, util.NeverStop) go gc.New(clientForUserAgentOrDie(*kubeconfig, "garbage-collector"), s.resyncPeriod, s.TerminatedPodGCThreshold).
Run(util.NeverStop)
}
//TODO(jdef) should eventually support more cloud providers here //TODO(jdef) should eventually support more cloud providers here
if s.CloudProvider != mesos.ProviderName { if s.CloudProvider != mesos.ProviderName {
@ -138,18 +146,18 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatalf("Cloud provider could not be initialized: %v", err) glog.Fatalf("Cloud provider could not be initialized: %v", err)
} }
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, nodeController := nodecontroller.NewNodeController(cloud, clientForUserAgentOrDie(*kubeconfig, "node-controller"),
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod) nodeController.Run(s.NodeSyncPeriod)
nodeStatusUpdaterController := node.NewStatusUpdater(kubeClient, s.NodeMonitorPeriod, time.Now) nodeStatusUpdaterController := node.NewStatusUpdater(clientForUserAgentOrDie(*kubeconfig, "node-status-controller"), s.NodeMonitorPeriod, time.Now)
if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil { if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil {
glog.Fatalf("Failed to start node status update controller: %v", err) glog.Fatalf("Failed to start node status update controller: %v", err)
} }
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) serviceController := servicecontroller.New(cloud, clientForUserAgentOrDie(*kubeconfig, "service-controller"), s.ClusterName)
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil { if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
glog.Errorf("Failed to start service controller: %v", err) glog.Errorf("Failed to start service controller: %v", err)
} }
@ -159,33 +167,91 @@ func (s *CMServer) Run(_ []string) error {
if !ok { if !ok {
glog.Fatal("Cloud provider must support routes if allocate-node-cidrs is set") glog.Fatal("Cloud provider must support routes if allocate-node-cidrs is set")
} }
routeController := routecontroller.New(routes, kubeClient, s.ClusterName, (*net.IPNet)(&s.ClusterCIDR)) routeController := routecontroller.New(routes, clientForUserAgentOrDie(*kubeconfig, "route-controller"), s.ClusterName, (*net.IPNet)(&s.ClusterCIDR))
routeController.Run(s.NodeSyncPeriod) routeController.Run(s.NodeSyncPeriod)
} }
go resourcequotacontroller.NewResourceQuotaController( go resourcequotacontroller.NewResourceQuotaController(
kubeClient, controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) clientForUserAgentOrDie(*kubeconfig, "resource-quota-controller"), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
namespaceController := namespacecontroller.NewNamespaceController(kubeClient, &unversioned.APIVersions{}, s.NamespaceSyncPeriod) // If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
var versionStrings []string
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
if versionStrings, err = client.ServerAPIVersions(kubeconfig); err == nil {
return true, nil
}
glog.Errorf("Failed to get api versions from server: %v", err)
return false, nil
})
if err != nil {
glog.Fatalf("Failed to get api versions from server: %v", err)
}
versions := &unversioned.APIVersions{Versions: versionStrings}
resourceMap, err := kubeClient.Discovery().ServerResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(clientForUserAgentOrDie(*kubeconfig, "namespace-controller"), &unversioned.APIVersions{}, s.NamespaceSyncPeriod)
namespaceController.Run() namespaceController.Run()
groupVersion := "extensions/v1beta1"
resources, found := resourceMap[groupVersion]
// TODO(k8s): this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "horizontalpodautoscalers") {
glog.Infof("Starting horizontal pod controller.")
hpaClient := clientForUserAgentOrDie(*kubeconfig, "horizontal-pod-autoscaler")
metricsClient := metrics.NewHeapsterMetricsClient(
hpaClient,
metrics.DefaultHeapsterNamespace,
metrics.DefaultHeapsterScheme,
metrics.DefaultHeapsterService,
metrics.DefaultHeapsterPort,
)
podautoscaler.NewHorizontalController(hpaClient, hpaClient, hpaClient, metricsClient).
Run(s.HorizontalPodAutoscalerSyncPeriod)
}
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientForUserAgentOrDie(*kubeconfig, "daemon-set-controller"), s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
}
if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(clientForUserAgentOrDie(*kubeconfig, "job-controller"), s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
}
if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller"), s.resyncPeriod).
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
}
}
volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags) volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfigFlags) provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfigFlags)
if err != nil { if err != nil {
glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.") glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
} }
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run() pvclaimBinder.Run()
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud) pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud)
if err != nil { if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err) glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
} }
pvRecycler.Run() pvRecycler.Run()
if provisioner != nil { if provisioner != nil {
pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(kubeClient), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud) pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-controller")), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud)
if err != nil { if err != nil {
glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err) glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
} }
@ -212,7 +278,7 @@ func (s *CMServer) Run(_ []string) error {
glog.Errorf("Error reading key for service account token controller: %v", err) glog.Errorf("Error reading key for service account token controller: %v", err)
} else { } else {
serviceaccountcontroller.NewTokensController( serviceaccountcontroller.NewTokensController(
kubeClient, clientForUserAgentOrDie(*kubeconfig, "tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{ serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA, RootCA: rootCA,
@ -222,19 +288,48 @@ func (s *CMServer) Run(_ []string) error {
} }
serviceaccountcontroller.NewServiceAccountsController( serviceaccountcontroller.NewServiceAccountsController(
kubeClient, clientForUserAgentOrDie(*kubeconfig, "service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
).Run() ).Run()
select {} select {}
} }
func clientForUserAgentOrDie(config client.Config, userAgent string) *client.Client {
fullUserAgent := client.DefaultKubernetesUserAgent() + "/" + userAgent
config.UserAgent = fullUserAgent
kubeClient, err := client.New(&config)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
return kubeClient
}
func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.EndpointController { func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.EndpointController {
if s.UseHostPortEndpoints { if s.UseHostPortEndpoints {
glog.V(2).Infof("Creating hostIP:hostPort endpoint controller") glog.V(2).Infof("Creating hostIP:hostPort endpoint controller")
return kmendpoint.NewEndpointController(client) return kmendpoint.NewEndpointController(client)
} }
glog.V(2).Infof("Creating podIP:containerPort endpoint controller") glog.V(2).Infof("Creating podIP:containerPort endpoint controller")
stockEndpointController := kendpoint.NewEndpointController(client, s.resyncPeriod) stockEndpointController := endpointcontroller.NewEndpointController(client, s.resyncPeriod)
return stockEndpointController return stockEndpointController
} }
func containsVersion(versions *unversioned.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
}
}
return false
}
func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
}