diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 29ca3dceb67..a66b34b3f99 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -156,9 +156,8 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { - glog.Infof("Loading client config for cluster controller %q", "cluster-controller") - ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) - glog.Infof("Running cluster controller") + stopChan := wait.NeverStop + minimizeLatency := false discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restClientCfg) serverResources, err := discoveryClient.ServerResources() @@ -166,7 +165,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err glog.Fatalf("Could not find resources from API Server: %v", err) } - go clustercontroller.NewClusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run() + clustercontroller.StartClusterController(restClientCfg, stopChan, s.ClusterMonitorPeriod.Duration) if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) { dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile) diff --git a/federation/pkg/federation-controller/cluster/clustercontroller.go b/federation/pkg/federation-controller/cluster/clustercontroller.go index 206125a1d4f..422e0aabe58 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" clustercache "k8s.io/kubernetes/federation/client/cache" @@ -53,8 +54,17 @@ type ClusterController struct { clusterStore clustercache.StoreToClusterLister } -// NewClusterController returns a new cluster controller -func NewClusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController { +// StartClusterController starts a new cluster controller +func StartClusterController(config *restclient.Config, stopChan <-chan struct{}, clusterMonitorPeriod time.Duration) { + restclient.AddUserAgent(config, "cluster-controller") + client := federationclientset.NewForConfigOrDie(config) + controller := newClusterController(client, clusterMonitorPeriod) + glog.Infof("Starting cluster controller") + controller.Run(stopChan) +} + +// newClusterController returns a new cluster controller +func newClusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController { cc := &ClusterController{ knownClusterSet: make(sets.String), federationClient: federationClient, @@ -112,15 +122,15 @@ func (cc *ClusterController) addToClusterSet(obj interface{}) { } // Run begins watching and syncing. -func (cc *ClusterController) Run() { +func (cc *ClusterController) Run(stopChan <-chan struct{}) { defer utilruntime.HandleCrash() - go cc.clusterController.Run(wait.NeverStop) + go cc.clusterController.Run(stopChan) // monitor cluster status periodically, in phase 1 we just get the health state from "/healthz" go wait.Until(func() { if err := cc.UpdateClusterStatus(); err != nil { glog.Errorf("Error monitoring cluster status: %v", err) } - }, cc.clusterMonitorPeriod, wait.NeverStop) + }, cc.clusterMonitorPeriod, stopChan) } func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) { diff --git a/federation/pkg/federation-controller/cluster/clustercontroller_test.go b/federation/pkg/federation-controller/cluster/clustercontroller_test.go index 5404c6dbea8..5d4b04ca590 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller_test.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller_test.go @@ -132,7 +132,7 @@ func TestUpdateClusterStatusOK(t *testing.T) { } } - manager := NewClusterController(federationClientSet, 5) + manager := newClusterController(federationClientSet, 5) err = manager.UpdateClusterStatus() if err != nil { t.Errorf("Failed to Update Cluster Status: %v", err)