mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
fed: Enable cluster controller stop and refactor start for reuse
This commit is contained in:
parent
522aeb7f36
commit
6d656dfe4a
@ -156,9 +156,8 @@ func Run(s *options.CMServer) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
|
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
|
||||||
glog.Infof("Loading client config for cluster controller %q", "cluster-controller")
|
stopChan := wait.NeverStop
|
||||||
ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
minimizeLatency := false
|
||||||
glog.Infof("Running cluster controller")
|
|
||||||
|
|
||||||
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restClientCfg)
|
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restClientCfg)
|
||||||
serverResources, err := discoveryClient.ServerResources()
|
serverResources, err := discoveryClient.ServerResources()
|
||||||
@ -166,7 +165,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
|
|||||||
glog.Fatalf("Could not find resources from API Server: %v", err)
|
glog.Fatalf("Could not find resources from API Server: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go clustercontroller.NewClusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run()
|
clustercontroller.StartClusterController(restClientCfg, stopChan, s.ClusterMonitorPeriod.Duration)
|
||||||
|
|
||||||
if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) {
|
if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) {
|
||||||
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
|
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
clustercache "k8s.io/kubernetes/federation/client/cache"
|
clustercache "k8s.io/kubernetes/federation/client/cache"
|
||||||
@ -53,8 +54,17 @@ type ClusterController struct {
|
|||||||
clusterStore clustercache.StoreToClusterLister
|
clusterStore clustercache.StoreToClusterLister
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClusterController returns a new cluster controller
|
// StartClusterController starts a new cluster controller
|
||||||
func NewClusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController {
|
func StartClusterController(config *restclient.Config, stopChan <-chan struct{}, clusterMonitorPeriod time.Duration) {
|
||||||
|
restclient.AddUserAgent(config, "cluster-controller")
|
||||||
|
client := federationclientset.NewForConfigOrDie(config)
|
||||||
|
controller := newClusterController(client, clusterMonitorPeriod)
|
||||||
|
glog.Infof("Starting cluster controller")
|
||||||
|
controller.Run(stopChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newClusterController returns a new cluster controller
|
||||||
|
func newClusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController {
|
||||||
cc := &ClusterController{
|
cc := &ClusterController{
|
||||||
knownClusterSet: make(sets.String),
|
knownClusterSet: make(sets.String),
|
||||||
federationClient: federationClient,
|
federationClient: federationClient,
|
||||||
@ -112,15 +122,15 @@ func (cc *ClusterController) addToClusterSet(obj interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run begins watching and syncing.
|
// Run begins watching and syncing.
|
||||||
func (cc *ClusterController) Run() {
|
func (cc *ClusterController) Run(stopChan <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
go cc.clusterController.Run(wait.NeverStop)
|
go cc.clusterController.Run(stopChan)
|
||||||
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
|
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
|
||||||
go wait.Until(func() {
|
go wait.Until(func() {
|
||||||
if err := cc.UpdateClusterStatus(); err != nil {
|
if err := cc.UpdateClusterStatus(); err != nil {
|
||||||
glog.Errorf("Error monitoring cluster status: %v", err)
|
glog.Errorf("Error monitoring cluster status: %v", err)
|
||||||
}
|
}
|
||||||
}, cc.clusterMonitorPeriod, wait.NeverStop)
|
}, cc.clusterMonitorPeriod, stopChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
|
func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
|
||||||
|
@ -132,7 +132,7 @@ func TestUpdateClusterStatusOK(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
manager := NewClusterController(federationClientSet, 5)
|
manager := newClusterController(federationClientSet, 5)
|
||||||
err = manager.UpdateClusterStatus()
|
err = manager.UpdateClusterStatus()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Failed to Update Cluster Status: %v", err)
|
t.Errorf("Failed to Update Cluster Status: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user