From 716156348dca0a0f9536d32f3344dcaa30b74330 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Thu, 18 May 2017 16:26:27 +0530 Subject: [PATCH] Add leader election support for controller-manager --- federation/apis/federation/v1beta1/types.go | 4 + .../app/controllermanager.go | 111 ++++++++++++++++-- .../app/options/options.go | 4 + 3 files changed, 112 insertions(+), 7 deletions(-) diff --git a/federation/apis/federation/v1beta1/types.go b/federation/apis/federation/v1beta1/types.go index 7451438a7dc..b8c9cca3fdb 100644 --- a/federation/apis/federation/v1beta1/types.go +++ b/federation/apis/federation/v1beta1/types.go @@ -154,4 +154,8 @@ const ( // FederationClusterSelectorAnnotation is used to determine placement of objects on federated clusters FederationClusterSelectorAnnotation string = "federation.alpha.kubernetes.io/cluster-selector" + + // FederationOnlyClusterSelector is the cluster selector to indicate any object in + // federation having this annotation should not be synced to federated clusters. + FederationOnlyClusterSelector string = "federation.kubernetes.io/federation-control-plane=true" ) diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index a30df506b82..19345963061 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -23,15 +23,24 @@ import ( "net" "net/http" "net/http/pprof" + "os" goruntime "runtime" "strconv" + "time" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/healthz" utilflag "k8s.io/apiserver/pkg/util/flag" + "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/federation/pkg/federatedtypes" @@ -41,6 +50,8 @@ import ( servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns" synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/version" @@ -52,6 +63,11 @@ import ( "k8s.io/client-go/discovery" ) +const ( + apiserverWaitTimeout = 2 * time.Minute + apiserverRetryInterval = 2 * time.Second +) + // NewControllerManagerCommand creates a *cobra.Command object with default parameters func NewControllerManagerCommand() *cobra.Command { s := options.NewCMServer() @@ -112,17 +128,68 @@ func Run(s *options.CMServer) error { glog.Fatal(server.ListenAndServe()) }() - run := func() { - err := StartControllers(s, restClientCfg) + federationClientset, err := federationclientset.NewForConfig(restclient.AddUserAgent(restClientCfg, "federation-controller-manager")) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + + run := func(stop <-chan struct{}) { + err := StartControllers(s, restClientCfg, stop) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } - run() + + if !s.LeaderElection.LeaderElect { + run(nil) + // unreachable + } + + if err := ensureFederationNamespace(federationClientset, s.FederationOnlyNamespace); err != nil { + glog.Fatalf("Failed to ensure federation only namespace %s: %v", s.FederationOnlyNamespace, err) + } + + leaderElectionClient := kubernetes.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "leader-election")) + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClientset)) + recorder := eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: "controller-manager"}) + + id, err := os.Hostname() + if err != nil { + return err + } + + rl := resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: s.FederationOnlyNamespace, + Name: "federation-controller-manager-leader-election", + Annotations: map[string]string{ + federationapi.FederationClusterSelectorAnnotation: federationapi.FederationOnlyClusterSelector, + }}, + Client: leaderElectionClient.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }, + } + + leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + Lock: &rl, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + glog.Fatalf("leaderelection lost") + }, + }, + }) + panic("unreachable") } -func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { - stopChan := wait.NeverStop +func StartControllers(s *options.CMServer, restClientCfg *restclient.Config, stopChan <-chan struct{}) error { minimizeLatency := false discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restClientCfg) @@ -147,7 +214,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err glog.V(3).Infof("Loading client config for service controller %q", servicecontroller.UserAgentName) scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) serviceController := servicecontroller.New(scClientset) - go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop) + go serviceController.Run(s.ConcurrentServiceSyncs, stopChan) } adapterSpecificArgs := make(map[string]interface{}) @@ -171,7 +238,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, ingresscontroller.UserAgentName)) ingressController := ingresscontroller.NewIngressController(ingClientset) glog.V(3).Infof("Running ingress controller") - ingressController.Run(wait.NeverStop) + ingressController.Run(stopChan) } select {} @@ -219,3 +286,33 @@ func hasRequiredResources(serverResources []*metav1.APIResourceList, requiredRes } return true } + +func ensureFederationNamespace(clientset *federationclientset.Clientset, namespace string) error { + ns := v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Annotations: map[string]string{ + federationapi.FederationClusterSelectorAnnotation: federationapi.FederationOnlyClusterSelector, + }, + }, + } + // Probably this is the first operation by controller manager on api server. So retry the operation + // until timeout to handle scenario where api server is not yet ready. + err := wait.PollImmediate(apiserverRetryInterval, apiserverWaitTimeout, func() (bool, error) { + var err error + _, err = clientset.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + glog.V(2).Infof("Failed to get namespace %s: %v", namespace, err) + return false, nil + } + _, err := clientset.CoreV1().Namespaces().Create(&ns) + if err != nil { + glog.V(2).Infof("Failed to create namespace %s: %v", namespace, err) + return false, nil + } + } + return true, nil + }) + return err +} diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go index 7c1a87dae40..470d61e6052 100644 --- a/federation/cmd/federation-controller-manager/app/options/options.go +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -83,6 +83,8 @@ type ControllerManagerConfiguration struct { // by cluster local hpas on local replicas, but too low a value can result in thrashing. // Higher values will result in slower response to scalibility conditions on local replicas. HpaScaleForbiddenWindow metav1.Duration `json:"HpaScaleForbiddenWindow"` + // pre-configured namespace name that would be created only in federation control plane + FederationOnlyNamespace string `json:"federationOnlyNamespaceName"` } // CMServer is the main context object for the controller manager. @@ -113,6 +115,7 @@ func NewCMServer() *CMServer { LeaderElection: leaderelectionconfig.DefaultLeaderElectionConfiguration(), Controllers: make(utilflag.ConfigurationMap), HpaScaleForbiddenWindow: metav1.Duration{Duration: 2 * time.Minute}, + FederationOnlyNamespace: "federation-only", }, } return &s @@ -144,5 +147,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "A set of key=value pairs that describe controller configuration "+ "to enable/disable specific controllers. Key should be the resource name (like services) and value should be true or false. "+ "For example: services=false,ingresses=false") + fs.StringVar(&s.FederationOnlyNamespace, "federation-only-namespace", s.FederationOnlyNamespace, "Name of the namespace that would be created only in federation control plane.") leaderelectionconfig.BindFlags(&s.LeaderElection, fs) }